mz_orchestrator_process/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::env;
12use std::ffi::OsStr;
13use std::fmt::Debug;
14use std::fs::Permissions;
15use std::future::Future;
16use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener};
17use std::os::unix::fs::PermissionsExt;
18use std::os::unix::process::ExitStatusExt;
19use std::path::{Path, PathBuf};
20use std::process::{ExitStatus, Stdio};
21use std::str::FromStr;
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::{Arc, Mutex};
24
25use anyhow::{Context, anyhow, bail};
26use async_stream::stream;
27use async_trait::async_trait;
28use chrono::{DateTime, Utc};
29use futures::StreamExt;
30use futures::stream::{BoxStream, FuturesUnordered};
31use itertools::Itertools;
32use libc::{SIGABRT, SIGBUS, SIGILL, SIGSEGV, SIGTRAP};
33use maplit::btreemap;
34use mz_orchestrator::scheduling_config::ServiceSchedulingConfig;
35use mz_orchestrator::{
36    CpuLimit, DiskLimit, MemoryLimit, NamespacedOrchestrator, Orchestrator, Service,
37    ServiceAssignments, ServiceConfig, ServiceEvent, ServicePort, ServiceProcessMetrics,
38    ServiceStatus,
39};
40use mz_ore::cast::{CastFrom, TryCastFrom};
41use mz_ore::error::ErrorExt;
42use mz_ore::netio::UnixSocketAddr;
43use mz_ore::result::ResultExt;
44use mz_ore::task::AbortOnDropHandle;
45use scopeguard::defer;
46use serde::Serialize;
47use sha1::{Digest, Sha1};
48use sysinfo::{Pid, PidExt, Process, ProcessExt, ProcessRefreshKind, System, SystemExt};
49use tokio::fs::remove_dir_all;
50use tokio::net::{TcpListener, UnixStream};
51use tokio::process::{Child, Command};
52use tokio::sync::{broadcast, mpsc, oneshot};
53use tokio::time::{self, Duration};
54use tokio::{fs, io, select};
55use tracing::{debug, error, info, warn};
56
57pub mod secrets;
58
59/// Configures a [`ProcessOrchestrator`].
60#[derive(Debug, Clone)]
61pub struct ProcessOrchestratorConfig {
62    /// The directory in which the orchestrator should look for executable
63    /// images.
64    pub image_dir: PathBuf,
65    /// Whether to supress output from spawned subprocesses.
66    pub suppress_output: bool,
67    /// The ID of the environment under orchestration.
68    pub environment_id: String,
69    /// The directory in which to store secrets.
70    pub secrets_dir: PathBuf,
71    /// A command to wrap the child command invocation
72    pub command_wrapper: Vec<String>,
73    /// Whether to crash this process if a child process crashes.
74    pub propagate_crashes: bool,
75    /// TCP proxy configuration.
76    ///
77    /// When enabled, for each named port of each created service, the process
78    /// orchestrator will bind a TCP listener that proxies incoming connections
79    /// to the underlying Unix domain socket. Each bound TCP address will be
80    /// emitted as a tracing event.
81    ///
82    /// The primary use is live debugging the running child services via tools
83    /// that do not support Unix domain sockets (e.g., Prometheus, web
84    /// browsers).
85    pub tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
86    /// A scratch directory that orchestrated processes can use for ephemeral storage.
87    pub scratch_directory: PathBuf,
88}
89
90/// Configures the TCP proxy for a [`ProcessOrchestrator`].
91///
92/// See [`ProcessOrchestratorConfig::tcp_proxy`].
93#[derive(Debug, Clone)]
94pub struct ProcessOrchestratorTcpProxyConfig {
95    /// The IP address on which to bind TCP listeners.
96    pub listen_addr: IpAddr,
97    /// A directory in which to write Prometheus scrape targets, for use with
98    /// Prometheus's file-based service discovery.
99    ///
100    /// Each [`NamespacedOrchestrator`] will maintain a single JSON file into
101    /// the directory named `NAMESPACE.json` containing the scrape targets for
102    /// all extant services. The scrape targets will use the TCP proxy address,
103    /// as Prometheus does not support scraping over Unix domain sockets.
104    ///
105    /// See also: <https://prometheus.io/docs/guides/file-sd/>
106    pub prometheus_service_discovery_dir: Option<PathBuf>,
107}
108
109/// An orchestrator backed by processes on the local machine.
110///
111/// **This orchestrator is for development only.** Due to limitations in the
112/// Unix process API, it does not exactly conform to the documented semantics
113/// of `Orchestrator`.
114#[derive(Debug)]
115pub struct ProcessOrchestrator {
116    image_dir: PathBuf,
117    suppress_output: bool,
118    namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
119    metadata_dir: PathBuf,
120    secrets_dir: PathBuf,
121    command_wrapper: Vec<String>,
122    propagate_crashes: bool,
123    tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
124    scratch_directory: PathBuf,
125    launch_spec: LaunchSpec,
126}
127
128#[derive(Debug, Clone, Copy)]
129enum LaunchSpec {
130    /// Directly execute the provided binary
131    Direct,
132    /// Use Systemd to start the binary
133    Systemd,
134}
135
136impl LaunchSpec {
137    fn determine_implementation() -> Result<Self, anyhow::Error> {
138        // According to https://www.freedesktop.org/software/systemd/man/latest/sd_booted.html
139        // checking for `/run/systemd/system/` is the canonical way to determine if the system
140        // was booted up with systemd.
141        match Path::new("/run/systemd/system/").try_exists()? {
142            true => Ok(Self::Systemd),
143            false => Ok(Self::Direct),
144        }
145    }
146
147    fn refine_command(
148        &self,
149        image: impl AsRef<OsStr>,
150        args: &[impl AsRef<OsStr>],
151        wrapper: &[String],
152        memory_limit: Option<&MemoryLimit>,
153        cpu_limit: Option<&CpuLimit>,
154    ) -> Command {
155        let mut cmd = match self {
156            Self::Direct => {
157                if let Some((program, wrapper_args)) = wrapper.split_first() {
158                    let mut cmd = Command::new(program);
159                    cmd.args(wrapper_args);
160                    cmd.arg(image);
161                    cmd
162                } else {
163                    Command::new(image)
164                }
165            }
166            Self::Systemd => {
167                let mut cmd = Command::new("systemd-run");
168                cmd.args(["--user", "--scope", "--quiet"]);
169                if let Some(memory_limit) = memory_limit {
170                    let memory_limit = memory_limit.0.as_u64();
171                    cmd.args(["-p", &format!("MemoryMax={memory_limit}")]);
172                    // TODO: We could set `-p MemorySwapMax=0` here to disable regular swap.
173                }
174                if let Some(cpu_limit) = cpu_limit {
175                    let cpu_limit = (cpu_limit.as_millicpus() + 9) / 10;
176                    cmd.args(["-p", &format!("CPUQuota={cpu_limit}%")]);
177                }
178
179                cmd.args(wrapper);
180                cmd.arg(image);
181                cmd
182            }
183        };
184        cmd.args(args);
185        cmd
186    }
187}
188
189impl ProcessOrchestrator {
190    /// Creates a new process orchestrator from the provided configuration.
191    pub async fn new(
192        ProcessOrchestratorConfig {
193            image_dir,
194            suppress_output,
195            environment_id,
196            secrets_dir,
197            command_wrapper,
198            propagate_crashes,
199            tcp_proxy,
200            scratch_directory,
201        }: ProcessOrchestratorConfig,
202    ) -> Result<ProcessOrchestrator, anyhow::Error> {
203        let metadata_dir = env::temp_dir().join(format!("environmentd-{environment_id}"));
204        fs::create_dir_all(&metadata_dir)
205            .await
206            .context("creating metadata directory")?;
207        fs::create_dir_all(&secrets_dir)
208            .await
209            .context("creating secrets directory")?;
210        fs::set_permissions(&secrets_dir, Permissions::from_mode(0o700))
211            .await
212            .context("setting secrets directory permissions")?;
213        if let Some(prometheus_dir) = tcp_proxy
214            .as_ref()
215            .and_then(|p| p.prometheus_service_discovery_dir.as_ref())
216        {
217            fs::create_dir_all(&prometheus_dir)
218                .await
219                .context("creating prometheus directory")?;
220        }
221
222        let launch_spec = LaunchSpec::determine_implementation()?;
223        info!(driver = ?launch_spec, "Process orchestrator launch spec");
224
225        Ok(ProcessOrchestrator {
226            image_dir: fs::canonicalize(image_dir).await?,
227            suppress_output,
228            namespaces: Mutex::new(BTreeMap::new()),
229            metadata_dir: fs::canonicalize(metadata_dir).await?,
230            secrets_dir: fs::canonicalize(secrets_dir).await?,
231            command_wrapper,
232            propagate_crashes,
233            tcp_proxy,
234            scratch_directory,
235            launch_spec,
236        })
237    }
238}
239
240impl Orchestrator for ProcessOrchestrator {
241    fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
242        let mut namespaces = self.namespaces.lock().expect("lock poisoned");
243        Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
244            let config = Arc::new(NamespacedProcessOrchestratorConfig {
245                namespace: namespace.into(),
246                image_dir: self.image_dir.clone(),
247                suppress_output: self.suppress_output,
248                metadata_dir: self.metadata_dir.clone(),
249                command_wrapper: self.command_wrapper.clone(),
250                propagate_crashes: self.propagate_crashes,
251                tcp_proxy: self.tcp_proxy.clone(),
252                scratch_directory: self.scratch_directory.clone(),
253                launch_spec: self.launch_spec,
254            });
255
256            let services = Arc::new(Mutex::new(BTreeMap::new()));
257            let (service_event_tx, service_event_rx) = broadcast::channel(16384);
258            let (command_tx, command_rx) = mpsc::unbounded_channel();
259
260            let worker = OrchestratorWorker {
261                config: Arc::clone(&config),
262                services: Arc::clone(&services),
263                service_event_tx,
264                system: System::new(),
265                command_rx,
266            }
267            .spawn();
268
269            Arc::new(NamespacedProcessOrchestrator {
270                config,
271                services,
272                service_event_rx,
273                command_tx,
274                scheduling_config: Default::default(),
275                _worker: worker,
276            })
277        }))
278    }
279}
280
281/// Configuration for a [`NamespacedProcessOrchestrator`].
282#[derive(Debug)]
283struct NamespacedProcessOrchestratorConfig {
284    namespace: String,
285    image_dir: PathBuf,
286    suppress_output: bool,
287    metadata_dir: PathBuf,
288    command_wrapper: Vec<String>,
289    propagate_crashes: bool,
290    tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
291    scratch_directory: PathBuf,
292    launch_spec: LaunchSpec,
293}
294
295impl NamespacedProcessOrchestratorConfig {
296    fn full_id(&self, id: &str) -> String {
297        format!("{}-{}", self.namespace, id)
298    }
299
300    fn service_run_dir(&self, id: &str) -> PathBuf {
301        self.metadata_dir.join(&self.full_id(id))
302    }
303
304    fn service_scratch_dir(&self, id: &str) -> PathBuf {
305        self.scratch_directory.join(&self.full_id(id))
306    }
307}
308
309#[derive(Debug)]
310struct NamespacedProcessOrchestrator {
311    config: Arc<NamespacedProcessOrchestratorConfig>,
312    services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
313    service_event_rx: broadcast::Receiver<ServiceEvent>,
314    command_tx: mpsc::UnboundedSender<WorkerCommand>,
315    scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
316    _worker: AbortOnDropHandle<()>,
317}
318
319impl NamespacedProcessOrchestrator {
320    fn send_command(&self, cmd: WorkerCommand) {
321        self.command_tx.send(cmd).expect("worker task not dropped");
322    }
323}
324
325#[async_trait]
326impl NamespacedOrchestrator for NamespacedProcessOrchestrator {
327    fn ensure_service(
328        &self,
329        id: &str,
330        config: ServiceConfig,
331    ) -> Result<Box<dyn Service>, anyhow::Error> {
332        let service = ProcessService {
333            run_dir: self.config.service_run_dir(id),
334            scale: config.scale,
335        };
336
337        // Enable disk if the size does not disable it.
338        let disk = config.disk_limit != Some(DiskLimit::ZERO);
339
340        let config = EnsureServiceConfig {
341            image: config.image,
342            args: config.args,
343            ports: config.ports,
344            memory_limit: config.memory_limit,
345            cpu_limit: config.cpu_limit,
346            scale: config.scale,
347            labels: config.labels,
348            disk,
349        };
350
351        self.send_command(WorkerCommand::EnsureService {
352            id: id.to_string(),
353            config,
354        });
355
356        Ok(Box::new(service))
357    }
358
359    fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
360        self.send_command(WorkerCommand::DropService { id: id.to_string() });
361        Ok(())
362    }
363
364    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
365        let (result_tx, result_rx) = oneshot::channel();
366        self.send_command(WorkerCommand::ListServices { result_tx });
367
368        result_rx.await.expect("worker task not dropped")
369    }
370
371    fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
372        let mut initial_events = vec![];
373        let mut service_event_rx = {
374            let services = self.services.lock().expect("lock poisoned");
375            for (service_id, process_states) in &*services {
376                for (process_id, process_state) in process_states.iter().enumerate() {
377                    initial_events.push(ServiceEvent {
378                        service_id: service_id.clone(),
379                        process_id: u64::cast_from(process_id),
380                        status: process_state.status.into(),
381                        time: process_state.status_time,
382                    });
383                }
384            }
385            self.service_event_rx.resubscribe()
386        };
387        Box::pin(stream! {
388            for event in initial_events {
389                yield Ok(event);
390            }
391            loop {
392                yield service_event_rx.recv().await.err_into();
393            }
394        })
395    }
396
397    async fn fetch_service_metrics(
398        &self,
399        id: &str,
400    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
401        let (result_tx, result_rx) = oneshot::channel();
402        self.send_command(WorkerCommand::FetchServiceMetrics {
403            id: id.to_string(),
404            result_tx,
405        });
406
407        result_rx.await.expect("worker task not dropped")
408    }
409
410    fn update_scheduling_config(
411        &self,
412        config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
413    ) {
414        *self.scheduling_config.write().expect("poisoned") = config;
415    }
416}
417
418/// Commands sent from a [`NamespacedProcessOrchestrator`] to its
419/// [`OrchestratorWorker`].
420///
421/// Commands for which the caller expects a result include a `result_tx` on which the
422/// [`OrchestratorWorker`] will deliver the result.
423enum WorkerCommand {
424    EnsureService {
425        id: String,
426        config: EnsureServiceConfig,
427    },
428    DropService {
429        id: String,
430    },
431    ListServices {
432        result_tx: oneshot::Sender<Result<Vec<String>, anyhow::Error>>,
433    },
434    FetchServiceMetrics {
435        id: String,
436        result_tx: oneshot::Sender<Result<Vec<ServiceProcessMetrics>, anyhow::Error>>,
437    },
438}
439
440/// Describes the desired state of a process.
441struct EnsureServiceConfig {
442    /// An opaque identifier for the executable or container image to run.
443    ///
444    /// Often names a container on Docker Hub or a path on the local machine.
445    pub image: String,
446    /// A function that generates the arguments for each process of the service
447    /// given the assigned listen addresses for each named port.
448    pub args: Box<dyn Fn(ServiceAssignments) -> Vec<String> + Send + Sync>,
449    /// Ports to expose.
450    pub ports: Vec<ServicePort>,
451    /// An optional limit on the memory that the service can use.
452    pub memory_limit: Option<MemoryLimit>,
453    /// An optional limit on the CPU that the service can use.
454    pub cpu_limit: Option<CpuLimit>,
455    /// The number of copies of this service to run.
456    pub scale: u16,
457    /// Arbitrary key–value pairs to attach to the service in the orchestrator
458    /// backend.
459    ///
460    /// The orchestrator backend may apply a prefix to the key if appropriate.
461    pub labels: BTreeMap<String, String>,
462    /// Whether scratch disk space should be allocated for the service.
463    pub disk: bool,
464}
465
466/// A task executing blocking work for a [`NamespacedProcessOrchestrator`] in the background.
467///
468/// This type exists to enable making [`NamespacedProcessOrchestrator::ensure_service`] and
469/// [`NamespacedProcessOrchestrator::drop_service`] non-blocking, allowing invocation of these
470/// methods in latency-sensitive contexts.
471///
472/// Note that, apart from `ensure_service` and `drop_service`, this worker also handles blocking
473/// orchestrator calls that query service state (such as `list_services`). These need to be
474/// sequenced through the worker loop to ensure they linearize as expected. For example, we want to
475/// ensure that a `list_services` result contains exactly those services that were previously
476/// created with `ensure_service` and not yet dropped with `drop_service`.
477struct OrchestratorWorker {
478    config: Arc<NamespacedProcessOrchestratorConfig>,
479    services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
480    service_event_tx: broadcast::Sender<ServiceEvent>,
481    system: System,
482    command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
483}
484
485impl OrchestratorWorker {
486    fn spawn(self) -> AbortOnDropHandle<()> {
487        let name = format!("process-orchestrator:{}", self.config.namespace);
488        mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
489    }
490
491    async fn run(mut self) {
492        while let Some(cmd) = self.command_rx.recv().await {
493            use WorkerCommand::*;
494            let result = match cmd {
495                EnsureService { id, config } => self.ensure_service(id, config).await,
496                DropService { id } => self.drop_service(&id).await,
497                ListServices { result_tx } => {
498                    let _ = result_tx.send(self.list_services().await);
499                    Ok(())
500                }
501                FetchServiceMetrics { id, result_tx } => {
502                    let _ = result_tx.send(self.fetch_service_metrics(&id));
503                    Ok(())
504                }
505            };
506
507            if let Err(error) = result {
508                panic!("process orchestrator worker failed: {error}");
509            }
510        }
511    }
512
513    fn fetch_service_metrics(
514        &mut self,
515        id: &str,
516    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
517        let pids: Vec<_> = {
518            let services = self.services.lock().expect("lock poisoned");
519            let Some(service) = services.get(id) else {
520                bail!("unknown service {id}")
521            };
522            service.iter().map(|p| p.pid()).collect()
523        };
524
525        let mut metrics = vec![];
526        for pid in pids {
527            let (cpu_nano_cores, memory_bytes) = match pid {
528                None => (None, None),
529                Some(pid) => {
530                    self.system
531                        .refresh_process_specifics(pid, ProcessRefreshKind::new().with_cpu());
532                    match self.system.process(pid) {
533                        None => (None, None),
534                        Some(process) => {
535                            // Justification for `unwrap`:
536                            //
537                            // `u64::try_cast_from(f: f64)`
538                            // will always succeed if 0 <= f < 2^64.
539                            // Since the max value of `process.cpu_usage()` is
540                            // 100.0 * num_of_cores, this will be true whenever there
541                            // are less than 2^64 / 10^9 logical cores, or about
542                            // 18 billion.
543                            let cpu = u64::try_cast_from(
544                                (f64::from(process.cpu_usage()) * 10_000_000.0).trunc(),
545                            )
546                            .expect("sane value of process.cpu_usage()");
547                            let memory = process.memory();
548                            (Some(cpu), Some(memory))
549                        }
550                    }
551                }
552            };
553            metrics.push(ServiceProcessMetrics {
554                cpu_nano_cores,
555                memory_bytes,
556                // Process orchestrator does not support the remaining fields right now.
557                disk_bytes: None,
558                heap_bytes: None,
559                heap_limit: None,
560            });
561        }
562        Ok(metrics)
563    }
564
565    async fn ensure_service(
566        &self,
567        id: String,
568        EnsureServiceConfig {
569            image,
570            args,
571            ports: ports_in,
572            memory_limit,
573            cpu_limit,
574            scale,
575            labels,
576            disk,
577        }: EnsureServiceConfig,
578    ) -> Result<(), anyhow::Error> {
579        let full_id = self.config.full_id(&id);
580
581        let run_dir = self.config.service_run_dir(&id);
582        fs::create_dir_all(&run_dir)
583            .await
584            .context("creating run directory")?;
585        let scratch_dir = if disk {
586            let scratch_dir = self.config.service_scratch_dir(&id);
587            fs::create_dir_all(&scratch_dir)
588                .await
589                .context("creating scratch directory")?;
590            Some(fs::canonicalize(&scratch_dir).await?)
591        } else {
592            None
593        };
594
595        // The service might already exist. If it has the same config as requested (currently we
596        // check only the scale), we have nothing to do. Otherwise we need to drop and recreate it.
597        let old_scale = {
598            let services = self.services.lock().expect("poisoned");
599            services.get(&id).map(|states| states.len())
600        };
601        match old_scale {
602            Some(old) if old == usize::from(scale) => return Ok(()),
603            Some(_) => self.drop_service(&id).await?,
604            None => (),
605        }
606
607        // Create sockets for all processes in the service.
608        let mut peer_addrs = Vec::new();
609        for i in 0..scale.into() {
610            let addresses = ports_in
611                .iter()
612                .map(|port| {
613                    let addr = socket_path(&run_dir, &port.name, i);
614                    (port.name.clone(), addr)
615                })
616                .collect();
617            peer_addrs.push(addresses);
618        }
619
620        {
621            let mut services = self.services.lock().expect("lock poisoned");
622
623            // Create the state for new processes.
624            let mut process_states = vec![];
625            for i in 0..scale.into() {
626                let listen_addrs = &peer_addrs[i];
627
628                // Fill out placeholders in the command wrapper for this process.
629                let mut command_wrapper = self.config.command_wrapper.clone();
630                if let Some(parts) = command_wrapper.get_mut(1..) {
631                    for part in parts {
632                        *part = interpolate_command(&part[..], &full_id, listen_addrs);
633                    }
634                }
635
636                // Allocate listeners for each TCP proxy, if requested.
637                let mut ports = vec![];
638                let mut tcp_proxy_addrs = BTreeMap::new();
639                for port in &ports_in {
640                    let tcp_proxy_listener = match &self.config.tcp_proxy {
641                        None => None,
642                        Some(tcp_proxy) => {
643                            let listener = StdTcpListener::bind((tcp_proxy.listen_addr, 0))
644                                .with_context(|| format!("binding to {}", tcp_proxy.listen_addr))?;
645                            listener.set_nonblocking(true)?;
646                            let listener = TcpListener::from_std(listener)?;
647                            let local_addr = listener.local_addr()?;
648                            tcp_proxy_addrs.insert(port.name.clone(), local_addr);
649                            Some(AddressedTcpListener {
650                                listener,
651                                local_addr,
652                            })
653                        }
654                    };
655                    ports.push(ServiceProcessPort {
656                        name: port.name.clone(),
657                        listen_addr: listen_addrs[&port.name].clone(),
658                        tcp_proxy_listener,
659                    });
660                }
661
662                let mut args = args(ServiceAssignments {
663                    listen_addrs,
664                    peer_addrs: &peer_addrs,
665                });
666                args.push(format!("--process={i}"));
667                if disk {
668                    if let Some(scratch) = &scratch_dir {
669                        args.push(format!("--scratch-directory={}", scratch.display()));
670                    } else {
671                        panic!(
672                            "internal error: service requested disk but no scratch directory was configured"
673                        );
674                    }
675                }
676
677                // Launch supervisor process.
678                let handle = mz_ore::task::spawn(
679                    || format!("process-orchestrator:{full_id}-{i}"),
680                    self.supervise_service_process(ServiceProcessConfig {
681                        id: id.to_string(),
682                        run_dir: run_dir.clone(),
683                        i,
684                        image: image.clone(),
685                        args,
686                        command_wrapper,
687                        ports,
688                        memory_limit,
689                        cpu_limit,
690                        launch_spec: self.config.launch_spec,
691                    }),
692                );
693
694                process_states.push(ProcessState {
695                    _handle: handle.abort_on_drop(),
696                    status: ProcessStatus::NotReady,
697                    status_time: Utc::now(),
698                    labels: labels.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
699                    tcp_proxy_addrs,
700                });
701            }
702
703            // Update the in-memory process state. We do this after we've created
704            // all process states to avoid partially updating our in-memory state.
705            services.insert(id, process_states);
706        }
707
708        self.maybe_write_prometheus_service_discovery_file().await;
709
710        Ok(())
711    }
712
713    async fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
714        let full_id = self.config.full_id(id);
715        let run_dir = self.config.service_run_dir(id);
716        let scratch_dir = self.config.service_scratch_dir(id);
717
718        // Drop the supervisor for the service, if it exists. If this service
719        // was under supervision, this will kill all processes associated with
720        // it.
721        {
722            let mut supervisors = self.services.lock().expect("lock poisoned");
723            supervisors.remove(id);
724        }
725
726        // If the service was orphaned by a prior incarnation of the
727        // orchestrator, it won't have been under supervision and therefore will
728        // still be running. So kill any process that we have state for in the
729        // run directory.
730        if let Ok(mut entries) = fs::read_dir(&run_dir).await {
731            while let Some(entry) = entries.next_entry().await? {
732                let path = entry.path();
733                if path.extension() == Some(OsStr::new("pid")) {
734                    let mut system = System::new();
735                    let Some(process) = find_process_from_pid_file(&mut system, &path).await else {
736                        continue;
737                    };
738                    let pid = process.pid();
739                    info!("terminating orphaned process for {full_id} with PID {pid}");
740                    process.kill();
741                }
742            }
743        }
744
745        // Clean up the on-disk state of the service.
746        if let Err(e) = remove_dir_all(run_dir).await {
747            if e.kind() != io::ErrorKind::NotFound {
748                warn!(
749                    "error cleaning up run directory for {full_id}: {}",
750                    e.display_with_causes()
751                );
752            }
753        }
754        if let Err(e) = remove_dir_all(scratch_dir).await {
755            if e.kind() != io::ErrorKind::NotFound {
756                warn!(
757                    "error cleaning up scratch directory for {full_id}: {}",
758                    e.display_with_causes()
759                );
760            }
761        }
762
763        self.maybe_write_prometheus_service_discovery_file().await;
764        Ok(())
765    }
766
767    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
768        let mut services = vec![];
769        let namespace_prefix = format!("{}-", self.config.namespace);
770        let mut entries = fs::read_dir(&self.config.metadata_dir).await?;
771        while let Some(entry) = entries.next_entry().await? {
772            let filename = entry
773                .file_name()
774                .into_string()
775                .map_err(|_| anyhow!("unable to convert filename to string"))?;
776            if let Some(id) = filename.strip_prefix(&namespace_prefix) {
777                services.push(id.to_string());
778            }
779        }
780        Ok(services)
781    }
782
783    fn supervise_service_process(
784        &self,
785        ServiceProcessConfig {
786            id,
787            run_dir,
788            i,
789            image,
790            args,
791            command_wrapper,
792            ports,
793            memory_limit,
794            cpu_limit,
795            launch_spec,
796        }: ServiceProcessConfig,
797    ) -> impl Future<Output = ()> + use<> {
798        let suppress_output = self.config.suppress_output;
799        let propagate_crashes = self.config.propagate_crashes;
800        let image = self.config.image_dir.join(image);
801        let pid_file = run_dir.join(format!("{i}.pid"));
802        let full_id = self.config.full_id(&id);
803
804        let state_updater = ProcessStateUpdater {
805            namespace: self.config.namespace.clone(),
806            id,
807            i,
808            services: Arc::clone(&self.services),
809            service_event_tx: self.service_event_tx.clone(),
810        };
811
812        async move {
813            let mut proxy_handles = vec![];
814            for port in ports {
815                if let Some(tcp_listener) = port.tcp_proxy_listener {
816                    info!(
817                        "{full_id}-{i}: {} tcp proxy listening on {}",
818                        port.name, tcp_listener.local_addr,
819                    );
820                    let uds_path = port.listen_addr;
821                    let handle = mz_ore::task::spawn(
822                        || format!("{full_id}-{i}-proxy-{}", port.name),
823                        tcp_proxy(TcpProxyConfig {
824                            name: format!("{full_id}-{i}-{}", port.name),
825                            tcp_listener,
826                            uds_path: uds_path.clone(),
827                        }),
828                    );
829                    proxy_handles.push(handle.abort_on_drop());
830                }
831            }
832
833            supervise_existing_process(&state_updater, &pid_file).await;
834
835            loop {
836                let mut cmd = launch_spec.refine_command(
837                    &image,
838                    &args,
839                    &command_wrapper,
840                    memory_limit.as_ref(),
841                    cpu_limit.as_ref(),
842                );
843                info!(
844                    "launching {full_id}-{i} via {} {}...",
845                    cmd.as_std().get_program().to_string_lossy(),
846                    cmd.as_std()
847                        .get_args()
848                        .map(|arg| arg.to_string_lossy())
849                        .join(" ")
850                );
851                if suppress_output {
852                    cmd.stdout(Stdio::null());
853                    cmd.stderr(Stdio::null());
854                }
855                match spawn_process(&state_updater, cmd, &pid_file, !command_wrapper.is_empty())
856                    .await
857                {
858                    Ok(status) => {
859                        if propagate_crashes && did_process_crash(status) {
860                            panic!(
861                                "{full_id}-{i} crashed; aborting because propagate_crashes is enabled"
862                            );
863                        }
864                        error!("{full_id}-{i} exited: {:?}; relaunching in 5s", status);
865                    }
866                    Err(e) => {
867                        error!("{full_id}-{i} failed to spawn: {}; relaunching in 5s", e);
868                    }
869                };
870                state_updater.update_state(ProcessStatus::NotReady);
871                time::sleep(Duration::from_secs(5)).await;
872            }
873        }
874    }
875
876    async fn maybe_write_prometheus_service_discovery_file(&self) {
877        #[derive(Serialize)]
878        struct StaticConfig {
879            labels: BTreeMap<String, String>,
880            targets: Vec<String>,
881        }
882
883        let Some(tcp_proxy) = &self.config.tcp_proxy else {
884            return;
885        };
886        let Some(dir) = &tcp_proxy.prometheus_service_discovery_dir else {
887            return;
888        };
889
890        let mut static_configs = vec![];
891        {
892            let services = self.services.lock().expect("lock poisoned");
893            for (id, states) in &*services {
894                for (i, state) in states.iter().enumerate() {
895                    for (name, addr) in &state.tcp_proxy_addrs {
896                        let mut labels = btreemap! {
897                            "mz_orchestrator_namespace".into() => self.config.namespace.clone(),
898                            "mz_orchestrator_service_id".into() => id.clone(),
899                            "mz_orchestrator_port".into() => name.clone(),
900                            "mz_orchestrator_ordinal".into() => i.to_string(),
901                        };
902                        for (k, v) in &state.labels {
903                            let k = format!("mz_orchestrator_{}", k.replace('-', "_"));
904                            labels.insert(k, v.clone());
905                        }
906                        static_configs.push(StaticConfig {
907                            labels,
908                            targets: vec![addr.to_string()],
909                        })
910                    }
911                }
912            }
913        }
914
915        let path = dir.join(Path::new(&self.config.namespace).with_extension("json"));
916        let contents = serde_json::to_vec_pretty(&static_configs).expect("valid json");
917        if let Err(e) = fs::write(&path, &contents).await {
918            warn!(
919                "{}: failed to write prometheus service discovery file: {}",
920                self.config.namespace,
921                e.display_with_causes()
922            );
923        }
924    }
925}
926
927struct ServiceProcessConfig {
928    id: String,
929    run_dir: PathBuf,
930    i: usize,
931    image: String,
932    args: Vec<String>,
933    command_wrapper: Vec<String>,
934    ports: Vec<ServiceProcessPort>,
935    memory_limit: Option<MemoryLimit>,
936    cpu_limit: Option<CpuLimit>,
937    launch_spec: LaunchSpec,
938}
939
940struct ServiceProcessPort {
941    name: String,
942    listen_addr: String,
943    tcp_proxy_listener: Option<AddressedTcpListener>,
944}
945
946/// Supervises an existing process, if it exists.
947async fn supervise_existing_process(state_updater: &ProcessStateUpdater, pid_file: &Path) {
948    let name = format!(
949        "{}-{}-{}",
950        state_updater.namespace, state_updater.id, state_updater.i
951    );
952
953    let mut system = System::new();
954    let Some(process) = find_process_from_pid_file(&mut system, pid_file).await else {
955        return;
956    };
957    let pid = process.pid();
958
959    info!(%pid, "discovered existing process for {name}");
960    state_updater.update_state(ProcessStatus::Ready { pid });
961
962    // Kill the process if the future is dropped.
963    let need_kill = AtomicBool::new(true);
964    defer! {
965        state_updater.update_state(ProcessStatus::NotReady);
966        if need_kill.load(Ordering::SeqCst) {
967            info!(%pid, "terminating existing process for {name}");
968            process.kill();
969        }
970    }
971
972    // Periodically check if the process has terminated.
973    let mut system = System::new();
974    while system.refresh_process_specifics(pid, ProcessRefreshKind::new()) {
975        time::sleep(Duration::from_secs(5)).await;
976    }
977
978    // The process has crashed. Exit the function without attempting to
979    // kill it.
980    warn!(%pid, "process for {name} has crashed; will reboot");
981    need_kill.store(false, Ordering::SeqCst)
982}
983
984fn interpolate_command(
985    command_part: &str,
986    full_id: &str,
987    ports: &BTreeMap<String, String>,
988) -> String {
989    let mut command_part = command_part.replace("%N", full_id);
990    for (endpoint, port) in ports {
991        command_part = command_part.replace(&format!("%P:{endpoint}"), port);
992    }
993    command_part
994}
995
996async fn spawn_process(
997    state_updater: &ProcessStateUpdater,
998    mut cmd: Command,
999    pid_file: &Path,
1000    send_sigterm: bool,
1001) -> Result<ExitStatus, anyhow::Error> {
1002    struct KillOnDropChild(Child, bool);
1003
1004    impl Drop for KillOnDropChild {
1005        fn drop(&mut self) {
1006            if let (Some(pid), true) = (self.0.id().and_then(|id| i32::try_from(id).ok()), self.1) {
1007                let _ = nix::sys::signal::kill(
1008                    nix::unistd::Pid::from_raw(pid),
1009                    nix::sys::signal::Signal::SIGTERM,
1010                );
1011                // Give the process a bit of time to react to the signal
1012                tokio::task::block_in_place(|| std::thread::sleep(Duration::from_millis(500)));
1013            }
1014            let _ = self.0.start_kill();
1015        }
1016    }
1017
1018    let mut child = KillOnDropChild(cmd.spawn()?, send_sigterm);
1019
1020    // Immediately write out a file containing the PID of the child process and
1021    // its start time. We'll use this state to rediscover our children if we
1022    // crash and restart. There's a very small window where we can crash after
1023    // having spawned the child but before writing this file, in which case we
1024    // might orphan the process. We accept this risk, though. It's hard to do
1025    // anything more robust given the Unix APIs available to us, and the
1026    // solution here is good enough given that the process orchestrator is only
1027    // used in development/testing.
1028    let pid = Pid::from_u32(child.0.id().unwrap());
1029    write_pid_file(pid_file, pid).await?;
1030    state_updater.update_state(ProcessStatus::Ready { pid });
1031    Ok(child.0.wait().await?)
1032}
1033
1034fn did_process_crash(status: ExitStatus) -> bool {
1035    // Likely not exhaustive. Feel free to add additional tests for other
1036    // indications of a crashed child process, as those conditions are
1037    // discovered.
1038    matches!(
1039        status.signal(),
1040        Some(SIGABRT | SIGBUS | SIGSEGV | SIGTRAP | SIGILL)
1041    )
1042}
1043
1044async fn write_pid_file(pid_file: &Path, pid: Pid) -> Result<(), anyhow::Error> {
1045    let mut system = System::new();
1046    system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1047    let start_time = system.process(pid).map_or(0, |p| p.start_time());
1048    fs::write(pid_file, format!("{pid}\n{start_time}\n")).await?;
1049    Ok(())
1050}
1051
1052async fn find_process_from_pid_file<'a>(
1053    system: &'a mut System,
1054    pid_file: &Path,
1055) -> Option<&'a Process> {
1056    let Ok(contents) = fs::read_to_string(pid_file).await else {
1057        return None;
1058    };
1059    let lines = contents.trim().split('\n').collect::<Vec<_>>();
1060    let [pid, start_time] = lines.as_slice() else {
1061        return None;
1062    };
1063    let Ok(pid) = Pid::from_str(pid) else {
1064        return None;
1065    };
1066    let Ok(start_time) = u64::from_str(start_time) else {
1067        return None;
1068    };
1069    system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1070    let process = system.process(pid)?;
1071    // Checking the start time protects against killing an unrelated process due
1072    // to PID reuse.
1073    if process.start_time() != start_time {
1074        return None;
1075    }
1076    Some(process)
1077}
1078
1079struct TcpProxyConfig {
1080    name: String,
1081    tcp_listener: AddressedTcpListener,
1082    uds_path: String,
1083}
1084
1085async fn tcp_proxy(
1086    TcpProxyConfig {
1087        name,
1088        tcp_listener,
1089        uds_path,
1090    }: TcpProxyConfig,
1091) {
1092    let mut conns = FuturesUnordered::new();
1093    loop {
1094        select! {
1095            res = tcp_listener.listener.accept() => {
1096                debug!("{name}: accepting tcp proxy connection");
1097                let uds_path = uds_path.clone();
1098                conns.push(Box::pin(async move {
1099                    let (mut tcp_conn, _) = res.context("accepting tcp connection")?;
1100                    let mut uds_conn = UnixStream::connect(uds_path)
1101                        .await
1102                        .context("making uds connection")?;
1103                    io::copy_bidirectional(&mut tcp_conn, &mut uds_conn)
1104                        .await
1105                        .context("proxying")
1106                }));
1107            }
1108            Some(Err(e)) = conns.next() => {
1109                warn!("{name}: tcp proxy connection failed: {}", e.display_with_causes());
1110            }
1111        }
1112    }
1113}
1114
1115struct ProcessStateUpdater {
1116    namespace: String,
1117    id: String,
1118    i: usize,
1119    services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
1120    service_event_tx: broadcast::Sender<ServiceEvent>,
1121}
1122
1123impl ProcessStateUpdater {
1124    fn update_state(&self, status: ProcessStatus) {
1125        let mut services = self.services.lock().expect("lock poisoned");
1126        let Some(process_states) = services.get_mut(&self.id) else {
1127            return;
1128        };
1129        let Some(process_state) = process_states.get_mut(self.i) else {
1130            return;
1131        };
1132        let status_time = Utc::now();
1133        process_state.status = status;
1134        process_state.status_time = status_time;
1135        let _ = self.service_event_tx.send(ServiceEvent {
1136            service_id: self.id.to_string(),
1137            process_id: u64::cast_from(self.i),
1138            status: status.into(),
1139            time: status_time,
1140        });
1141    }
1142}
1143
1144#[derive(Debug)]
1145struct ProcessState {
1146    _handle: AbortOnDropHandle<()>,
1147    status: ProcessStatus,
1148    status_time: DateTime<Utc>,
1149    labels: BTreeMap<String, String>,
1150    tcp_proxy_addrs: BTreeMap<String, SocketAddr>,
1151}
1152
1153impl ProcessState {
1154    fn pid(&self) -> Option<Pid> {
1155        match &self.status {
1156            ProcessStatus::NotReady => None,
1157            ProcessStatus::Ready { pid } => Some(*pid),
1158        }
1159    }
1160}
1161
1162#[derive(Debug, Clone, Copy)]
1163enum ProcessStatus {
1164    NotReady,
1165    Ready { pid: Pid },
1166}
1167
1168impl From<ProcessStatus> for ServiceStatus {
1169    fn from(status: ProcessStatus) -> ServiceStatus {
1170        match status {
1171            ProcessStatus::NotReady => ServiceStatus::Offline(None),
1172            ProcessStatus::Ready { .. } => ServiceStatus::Online,
1173        }
1174    }
1175}
1176
1177fn socket_path(run_dir: &Path, port: &str, process: usize) -> String {
1178    let desired = run_dir
1179        .join(format!("{port}-{process}"))
1180        .to_string_lossy()
1181        .into_owned();
1182    if UnixSocketAddr::from_pathname(&desired).is_err() {
1183        // Unix socket addresses have a very low maximum length of around 100
1184        // bytes on most platforms.
1185        env::temp_dir()
1186            .join(hex::encode(Sha1::digest(desired)))
1187            .display()
1188            .to_string()
1189    } else {
1190        desired
1191    }
1192}
1193
1194struct AddressedTcpListener {
1195    listener: TcpListener,
1196    local_addr: SocketAddr,
1197}
1198
1199#[derive(Debug, Clone)]
1200struct ProcessService {
1201    run_dir: PathBuf,
1202    scale: u16,
1203}
1204
1205impl Service for ProcessService {
1206    fn addresses(&self, port: &str) -> Vec<String> {
1207        (0..self.scale)
1208            .map(|i| socket_path(&self.run_dir, port, i.into()))
1209            .collect()
1210    }
1211}