Skip to main content

mz_orchestrator_process/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::env;
12use std::ffi::OsStr;
13use std::fmt::Debug;
14use std::fs::Permissions;
15use std::future::Future;
16use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener};
17use std::num::NonZero;
18use std::os::unix::fs::PermissionsExt;
19use std::os::unix::process::ExitStatusExt;
20use std::path::{Path, PathBuf};
21use std::process::{ExitStatus, Stdio};
22use std::str::FromStr;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::{Arc, Mutex};
25
26use anyhow::{Context, anyhow, bail};
27use async_stream::stream;
28use async_trait::async_trait;
29use chrono::{DateTime, Utc};
30use futures::StreamExt;
31use futures::stream::{BoxStream, FuturesUnordered};
32use itertools::Itertools;
33use libc::{SIGABRT, SIGBUS, SIGILL, SIGSEGV, SIGTRAP};
34use maplit::btreemap;
35use mz_orchestrator::scheduling_config::ServiceSchedulingConfig;
36use mz_orchestrator::{
37    CpuLimit, DiskLimit, MemoryLimit, NamespacedOrchestrator, Orchestrator, Service,
38    ServiceAssignments, ServiceConfig, ServiceEvent, ServicePort, ServiceProcessMetrics,
39    ServiceStatus,
40};
41use mz_ore::cast::{CastFrom, TryCastFrom};
42use mz_ore::error::ErrorExt;
43use mz_ore::netio::UnixSocketAddr;
44use mz_ore::result::ResultExt;
45use mz_ore::task::AbortOnDropHandle;
46use scopeguard::defer;
47use serde::Serialize;
48use sha1::{Digest, Sha1};
49use sysinfo::{Pid, PidExt, Process, ProcessExt, ProcessRefreshKind, System, SystemExt};
50use tokio::fs::remove_dir_all;
51use tokio::net::{TcpListener, UnixStream};
52use tokio::process::{Child, Command};
53use tokio::sync::{broadcast, mpsc, oneshot};
54use tokio::time::{self, Duration};
55use tokio::{fs, io, select};
56use tracing::{debug, error, info, warn};
57
58pub mod secrets;
59
60/// Configures a [`ProcessOrchestrator`].
61#[derive(Debug, Clone)]
62pub struct ProcessOrchestratorConfig {
63    /// The directory in which the orchestrator should look for executable
64    /// images.
65    pub image_dir: PathBuf,
66    /// Whether to supress output from spawned subprocesses.
67    pub suppress_output: bool,
68    /// The ID of the environment under orchestration.
69    pub environment_id: String,
70    /// The directory in which to store secrets.
71    pub secrets_dir: PathBuf,
72    /// A command to wrap the child command invocation
73    pub command_wrapper: Vec<String>,
74    /// Whether to crash this process if a child process crashes.
75    pub propagate_crashes: bool,
76    /// TCP proxy configuration.
77    ///
78    /// When enabled, for each named port of each created service, the process
79    /// orchestrator will bind a TCP listener that proxies incoming connections
80    /// to the underlying Unix domain socket. Each bound TCP address will be
81    /// emitted as a tracing event.
82    ///
83    /// The primary use is live debugging the running child services via tools
84    /// that do not support Unix domain sockets (e.g., Prometheus, web
85    /// browsers).
86    pub tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
87    /// A scratch directory that orchestrated processes can use for ephemeral storage.
88    pub scratch_directory: PathBuf,
89}
90
91/// Configures the TCP proxy for a [`ProcessOrchestrator`].
92///
93/// See [`ProcessOrchestratorConfig::tcp_proxy`].
94#[derive(Debug, Clone)]
95pub struct ProcessOrchestratorTcpProxyConfig {
96    /// The IP address on which to bind TCP listeners.
97    pub listen_addr: IpAddr,
98    /// A directory in which to write Prometheus scrape targets, for use with
99    /// Prometheus's file-based service discovery.
100    ///
101    /// Each [`NamespacedOrchestrator`] will maintain a single JSON file into
102    /// the directory named `NAMESPACE.json` containing the scrape targets for
103    /// all extant services. The scrape targets will use the TCP proxy address,
104    /// as Prometheus does not support scraping over Unix domain sockets.
105    ///
106    /// See also: <https://prometheus.io/docs/guides/file-sd/>
107    pub prometheus_service_discovery_dir: Option<PathBuf>,
108}
109
110/// An orchestrator backed by processes on the local machine.
111///
112/// **This orchestrator is for development only.** Due to limitations in the
113/// Unix process API, it does not exactly conform to the documented semantics
114/// of `Orchestrator`.
115#[derive(Debug)]
116pub struct ProcessOrchestrator {
117    image_dir: PathBuf,
118    suppress_output: bool,
119    namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
120    metadata_dir: PathBuf,
121    secrets_dir: PathBuf,
122    command_wrapper: Vec<String>,
123    propagate_crashes: bool,
124    tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
125    scratch_directory: PathBuf,
126    launch_spec: LaunchSpec,
127}
128
129#[derive(Debug, Clone, Copy)]
130enum LaunchSpec {
131    /// Directly execute the provided binary
132    Direct,
133    /// Use Systemd to start the binary
134    Systemd,
135}
136
137impl LaunchSpec {
138    fn determine_implementation() -> Result<Self, anyhow::Error> {
139        // According to https://www.freedesktop.org/software/systemd/man/latest/sd_booted.html
140        // checking for `/run/systemd/system/` is the canonical way to determine if the system
141        // was booted up with systemd.
142        match Path::new("/run/systemd/system/").try_exists()? {
143            true => Ok(Self::Systemd),
144            false => Ok(Self::Direct),
145        }
146    }
147
148    fn refine_command(
149        &self,
150        image: impl AsRef<OsStr>,
151        args: &[impl AsRef<OsStr>],
152        wrapper: &[String],
153        memory_limit: Option<&MemoryLimit>,
154        cpu_limit: Option<&CpuLimit>,
155    ) -> Command {
156        let mut cmd = match self {
157            Self::Direct => {
158                if let Some((program, wrapper_args)) = wrapper.split_first() {
159                    let mut cmd = Command::new(program);
160                    cmd.args(wrapper_args);
161                    cmd.arg(image);
162                    cmd
163                } else {
164                    Command::new(image)
165                }
166            }
167            Self::Systemd => {
168                let mut cmd = Command::new("systemd-run");
169                cmd.args(["--user", "--scope", "--quiet"]);
170                if let Some(memory_limit) = memory_limit {
171                    let memory_limit = memory_limit.0.as_u64();
172                    cmd.args(["-p", &format!("MemoryMax={memory_limit}")]);
173                    // TODO: We could set `-p MemorySwapMax=0` here to disable regular swap.
174                }
175                if let Some(cpu_limit) = cpu_limit {
176                    let cpu_limit = (cpu_limit.as_millicpus() + 9) / 10;
177                    cmd.args(["-p", &format!("CPUQuota={cpu_limit}%")]);
178                }
179
180                cmd.args(wrapper);
181                cmd.arg(image);
182                cmd
183            }
184        };
185        cmd.args(args);
186        cmd
187    }
188}
189
190impl ProcessOrchestrator {
191    /// Creates a new process orchestrator from the provided configuration.
192    pub async fn new(
193        ProcessOrchestratorConfig {
194            image_dir,
195            suppress_output,
196            environment_id,
197            secrets_dir,
198            command_wrapper,
199            propagate_crashes,
200            tcp_proxy,
201            scratch_directory,
202        }: ProcessOrchestratorConfig,
203    ) -> Result<ProcessOrchestrator, anyhow::Error> {
204        let metadata_dir = env::temp_dir().join(format!("environmentd-{environment_id}"));
205        fs::create_dir_all(&metadata_dir)
206            .await
207            .context("creating metadata directory")?;
208        fs::create_dir_all(&secrets_dir)
209            .await
210            .context("creating secrets directory")?;
211        fs::set_permissions(&secrets_dir, Permissions::from_mode(0o700))
212            .await
213            .context("setting secrets directory permissions")?;
214        if let Some(prometheus_dir) = tcp_proxy
215            .as_ref()
216            .and_then(|p| p.prometheus_service_discovery_dir.as_ref())
217        {
218            fs::create_dir_all(&prometheus_dir)
219                .await
220                .context("creating prometheus directory")?;
221        }
222
223        let launch_spec = LaunchSpec::determine_implementation()?;
224        info!(driver = ?launch_spec, "Process orchestrator launch spec");
225
226        Ok(ProcessOrchestrator {
227            image_dir: fs::canonicalize(image_dir).await?,
228            suppress_output,
229            namespaces: Mutex::new(BTreeMap::new()),
230            metadata_dir: fs::canonicalize(metadata_dir).await?,
231            secrets_dir: fs::canonicalize(secrets_dir).await?,
232            command_wrapper,
233            propagate_crashes,
234            tcp_proxy,
235            scratch_directory,
236            launch_spec,
237        })
238    }
239}
240
241impl Orchestrator for ProcessOrchestrator {
242    fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
243        let mut namespaces = self.namespaces.lock().expect("lock poisoned");
244        Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
245            let config = Arc::new(NamespacedProcessOrchestratorConfig {
246                namespace: namespace.into(),
247                image_dir: self.image_dir.clone(),
248                suppress_output: self.suppress_output,
249                metadata_dir: self.metadata_dir.clone(),
250                command_wrapper: self.command_wrapper.clone(),
251                propagate_crashes: self.propagate_crashes,
252                tcp_proxy: self.tcp_proxy.clone(),
253                scratch_directory: self.scratch_directory.clone(),
254                launch_spec: self.launch_spec,
255            });
256
257            let services = Arc::new(Mutex::new(BTreeMap::new()));
258            let (service_event_tx, service_event_rx) = broadcast::channel(16384);
259            let (command_tx, command_rx) = mpsc::unbounded_channel();
260
261            let worker = OrchestratorWorker {
262                config: Arc::clone(&config),
263                services: Arc::clone(&services),
264                service_event_tx,
265                system: System::new(),
266                command_rx,
267            }
268            .spawn();
269
270            Arc::new(NamespacedProcessOrchestrator {
271                config,
272                services,
273                service_event_rx,
274                command_tx,
275                scheduling_config: Default::default(),
276                _worker: worker,
277            })
278        }))
279    }
280}
281
282/// Configuration for a [`NamespacedProcessOrchestrator`].
283#[derive(Debug)]
284struct NamespacedProcessOrchestratorConfig {
285    namespace: String,
286    image_dir: PathBuf,
287    suppress_output: bool,
288    metadata_dir: PathBuf,
289    command_wrapper: Vec<String>,
290    propagate_crashes: bool,
291    tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
292    scratch_directory: PathBuf,
293    launch_spec: LaunchSpec,
294}
295
296impl NamespacedProcessOrchestratorConfig {
297    fn full_id(&self, id: &str) -> String {
298        format!("{}-{}", self.namespace, id)
299    }
300
301    fn service_run_dir(&self, id: &str) -> PathBuf {
302        self.metadata_dir.join(&self.full_id(id))
303    }
304
305    fn service_scratch_dir(&self, id: &str) -> PathBuf {
306        self.scratch_directory.join(&self.full_id(id))
307    }
308}
309
310#[derive(Debug)]
311struct NamespacedProcessOrchestrator {
312    config: Arc<NamespacedProcessOrchestratorConfig>,
313    services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
314    service_event_rx: broadcast::Receiver<ServiceEvent>,
315    command_tx: mpsc::UnboundedSender<WorkerCommand>,
316    scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
317    _worker: AbortOnDropHandle<()>,
318}
319
320impl NamespacedProcessOrchestrator {
321    fn send_command(&self, cmd: WorkerCommand) {
322        self.command_tx.send(cmd).expect("worker task not dropped");
323    }
324}
325
326#[async_trait]
327impl NamespacedOrchestrator for NamespacedProcessOrchestrator {
328    fn ensure_service(
329        &self,
330        id: &str,
331        config: ServiceConfig,
332    ) -> Result<Box<dyn Service>, anyhow::Error> {
333        let service = ProcessService {
334            id: id.to_string(),
335            run_dir: self.config.service_run_dir(id),
336            scale: config.scale,
337            services: Arc::clone(&self.services),
338        };
339
340        // Enable disk if the size does not disable it.
341        let disk = config.disk_limit != Some(DiskLimit::ZERO);
342
343        let config = EnsureServiceConfig {
344            image: config.image,
345            args: config.args,
346            ports: config.ports,
347            memory_limit: config.memory_limit,
348            cpu_limit: config.cpu_limit,
349            scale: config.scale,
350            labels: config.labels,
351            disk,
352        };
353
354        self.send_command(WorkerCommand::EnsureService {
355            id: id.to_string(),
356            config,
357        });
358
359        Ok(Box::new(service))
360    }
361
362    fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
363        self.send_command(WorkerCommand::DropService { id: id.to_string() });
364        Ok(())
365    }
366
367    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
368        let (result_tx, result_rx) = oneshot::channel();
369        self.send_command(WorkerCommand::ListServices { result_tx });
370
371        result_rx.await.expect("worker task not dropped")
372    }
373
374    fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
375        let mut initial_events = vec![];
376        let mut service_event_rx = {
377            let services = self.services.lock().expect("lock poisoned");
378            for (service_id, process_states) in &*services {
379                for (process_id, process_state) in process_states.iter().enumerate() {
380                    initial_events.push(ServiceEvent {
381                        service_id: service_id.clone(),
382                        process_id: u64::cast_from(process_id),
383                        status: process_state.status.into(),
384                        time: process_state.status_time,
385                    });
386                }
387            }
388            self.service_event_rx.resubscribe()
389        };
390        Box::pin(stream! {
391            for event in initial_events {
392                yield Ok(event);
393            }
394            loop {
395                yield service_event_rx.recv().await.err_into();
396            }
397        })
398    }
399
400    async fn fetch_service_metrics(
401        &self,
402        id: &str,
403    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
404        let (result_tx, result_rx) = oneshot::channel();
405        self.send_command(WorkerCommand::FetchServiceMetrics {
406            id: id.to_string(),
407            result_tx,
408        });
409
410        result_rx.await.expect("worker task not dropped")
411    }
412
413    fn update_scheduling_config(
414        &self,
415        config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
416    ) {
417        *self.scheduling_config.write().expect("poisoned") = config;
418    }
419}
420
421/// Commands sent from a [`NamespacedProcessOrchestrator`] to its
422/// [`OrchestratorWorker`].
423///
424/// Commands for which the caller expects a result include a `result_tx` on which the
425/// [`OrchestratorWorker`] will deliver the result.
426enum WorkerCommand {
427    EnsureService {
428        id: String,
429        config: EnsureServiceConfig,
430    },
431    DropService {
432        id: String,
433    },
434    ListServices {
435        result_tx: oneshot::Sender<Result<Vec<String>, anyhow::Error>>,
436    },
437    FetchServiceMetrics {
438        id: String,
439        result_tx: oneshot::Sender<Result<Vec<ServiceProcessMetrics>, anyhow::Error>>,
440    },
441}
442
443/// Describes the desired state of a process.
444struct EnsureServiceConfig {
445    /// An opaque identifier for the executable or container image to run.
446    ///
447    /// Often names a container on Docker Hub or a path on the local machine.
448    pub image: String,
449    /// A function that generates the arguments for each process of the service
450    /// given the assigned listen addresses for each named port.
451    pub args: Box<dyn Fn(ServiceAssignments) -> Vec<String> + Send + Sync>,
452    /// Ports to expose.
453    pub ports: Vec<ServicePort>,
454    /// An optional limit on the memory that the service can use.
455    pub memory_limit: Option<MemoryLimit>,
456    /// An optional limit on the CPU that the service can use.
457    pub cpu_limit: Option<CpuLimit>,
458    /// The number of copies of this service to run.
459    pub scale: NonZero<u16>,
460    /// Arbitrary key–value pairs to attach to the service in the orchestrator
461    /// backend.
462    ///
463    /// The orchestrator backend may apply a prefix to the key if appropriate.
464    pub labels: BTreeMap<String, String>,
465    /// Whether scratch disk space should be allocated for the service.
466    pub disk: bool,
467}
468
469/// A task executing blocking work for a [`NamespacedProcessOrchestrator`] in the background.
470///
471/// This type exists to enable making [`NamespacedProcessOrchestrator::ensure_service`] and
472/// [`NamespacedProcessOrchestrator::drop_service`] non-blocking, allowing invocation of these
473/// methods in latency-sensitive contexts.
474///
475/// Note that, apart from `ensure_service` and `drop_service`, this worker also handles blocking
476/// orchestrator calls that query service state (such as `list_services`). These need to be
477/// sequenced through the worker loop to ensure they linearize as expected. For example, we want to
478/// ensure that a `list_services` result contains exactly those services that were previously
479/// created with `ensure_service` and not yet dropped with `drop_service`.
480struct OrchestratorWorker {
481    config: Arc<NamespacedProcessOrchestratorConfig>,
482    services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
483    service_event_tx: broadcast::Sender<ServiceEvent>,
484    system: System,
485    command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
486}
487
488impl OrchestratorWorker {
489    fn spawn(self) -> AbortOnDropHandle<()> {
490        let name = format!("process-orchestrator:{}", self.config.namespace);
491        mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
492    }
493
494    async fn run(mut self) {
495        while let Some(cmd) = self.command_rx.recv().await {
496            use WorkerCommand::*;
497            let result = match cmd {
498                EnsureService { id, config } => self.ensure_service(id, config).await,
499                DropService { id } => self.drop_service(&id).await,
500                ListServices { result_tx } => {
501                    let _ = result_tx.send(self.list_services().await);
502                    Ok(())
503                }
504                FetchServiceMetrics { id, result_tx } => {
505                    let _ = result_tx.send(self.fetch_service_metrics(&id));
506                    Ok(())
507                }
508            };
509
510            if let Err(error) = result {
511                panic!("process orchestrator worker failed: {error}");
512            }
513        }
514    }
515
516    fn fetch_service_metrics(
517        &mut self,
518        id: &str,
519    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
520        let pids: Vec<_> = {
521            let services = self.services.lock().expect("lock poisoned");
522            let Some(service) = services.get(id) else {
523                bail!("unknown service {id}")
524            };
525            service.iter().map(|p| p.pid()).collect()
526        };
527
528        let mut metrics = vec![];
529        for pid in pids {
530            let (cpu_nano_cores, memory_bytes) = match pid {
531                None => (None, None),
532                Some(pid) => {
533                    self.system
534                        .refresh_process_specifics(pid, ProcessRefreshKind::new().with_cpu());
535                    match self.system.process(pid) {
536                        None => (None, None),
537                        Some(process) => {
538                            // Justification for `unwrap`:
539                            //
540                            // `u64::try_cast_from(f: f64)`
541                            // will always succeed if 0 <= f < 2^64.
542                            // Since the max value of `process.cpu_usage()` is
543                            // 100.0 * num_of_cores, this will be true whenever there
544                            // are less than 2^64 / 10^9 logical cores, or about
545                            // 18 billion.
546                            let cpu = u64::try_cast_from(
547                                (f64::from(process.cpu_usage()) * 10_000_000.0).trunc(),
548                            )
549                            .expect("sane value of process.cpu_usage()");
550                            let memory = process.memory();
551                            (Some(cpu), Some(memory))
552                        }
553                    }
554                }
555            };
556            metrics.push(ServiceProcessMetrics {
557                cpu_nano_cores,
558                memory_bytes,
559                // Process orchestrator does not support the remaining fields right now.
560                disk_bytes: None,
561                heap_bytes: None,
562                heap_limit: None,
563            });
564        }
565        Ok(metrics)
566    }
567
568    async fn ensure_service(
569        &self,
570        id: String,
571        EnsureServiceConfig {
572            image,
573            args,
574            ports: ports_in,
575            memory_limit,
576            cpu_limit,
577            scale,
578            labels,
579            disk,
580        }: EnsureServiceConfig,
581    ) -> Result<(), anyhow::Error> {
582        let full_id = self.config.full_id(&id);
583
584        let run_dir = self.config.service_run_dir(&id);
585        fs::create_dir_all(&run_dir)
586            .await
587            .context("creating run directory")?;
588        let scratch_dir = if disk {
589            let scratch_dir = self.config.service_scratch_dir(&id);
590            fs::create_dir_all(&scratch_dir)
591                .await
592                .context("creating scratch directory")?;
593            Some(fs::canonicalize(&scratch_dir).await?)
594        } else {
595            None
596        };
597
598        // The service might already exist. If it has the same config as requested (currently we
599        // check only the scale), we have nothing to do. Otherwise we need to drop and recreate it.
600        let old_scale = {
601            let services = self.services.lock().expect("poisoned");
602            services.get(&id).map(|states| states.len())
603        };
604        match old_scale {
605            Some(old) if old == usize::cast_from(scale) => return Ok(()),
606            Some(_) => self.drop_service(&id).await?,
607            None => (),
608        }
609
610        // Create sockets for all processes in the service.
611        let mut peer_addrs = Vec::new();
612        for i in 0..scale.into() {
613            let addresses = ports_in
614                .iter()
615                .map(|port| {
616                    let addr = socket_path(&run_dir, &port.name, i);
617                    (port.name.clone(), addr)
618                })
619                .collect();
620            peer_addrs.push(addresses);
621        }
622
623        {
624            let mut services = self.services.lock().expect("lock poisoned");
625
626            // Create the state for new processes.
627            let mut process_states = vec![];
628            for i in 0..usize::cast_from(scale) {
629                let listen_addrs = &peer_addrs[i];
630
631                // Fill out placeholders in the command wrapper for this process.
632                let mut command_wrapper = self.config.command_wrapper.clone();
633                if let Some(parts) = command_wrapper.get_mut(1..) {
634                    for part in parts {
635                        *part = interpolate_command(&part[..], &full_id, listen_addrs);
636                    }
637                }
638
639                // Allocate listeners for each TCP proxy, if requested.
640                let mut ports = vec![];
641                let mut tcp_proxy_addrs = BTreeMap::new();
642                for port in &ports_in {
643                    let tcp_proxy_listener = match &self.config.tcp_proxy {
644                        None => None,
645                        Some(tcp_proxy) => {
646                            let listener = StdTcpListener::bind((tcp_proxy.listen_addr, 0))
647                                .with_context(|| format!("binding to {}", tcp_proxy.listen_addr))?;
648                            listener.set_nonblocking(true)?;
649                            let listener = TcpListener::from_std(listener)?;
650                            let local_addr = listener.local_addr()?;
651                            tcp_proxy_addrs.insert(port.name.clone(), local_addr);
652                            Some(AddressedTcpListener {
653                                listener,
654                                local_addr,
655                            })
656                        }
657                    };
658                    ports.push(ServiceProcessPort {
659                        name: port.name.clone(),
660                        listen_addr: listen_addrs[&port.name].clone(),
661                        tcp_proxy_listener,
662                    });
663                }
664
665                let mut args = args(ServiceAssignments {
666                    listen_addrs,
667                    peer_addrs: &peer_addrs,
668                });
669                args.push(format!("--process={i}"));
670                if disk {
671                    if let Some(scratch) = &scratch_dir {
672                        args.push(format!("--scratch-directory={}", scratch.display()));
673                    } else {
674                        panic!(
675                            "internal error: service requested disk but no scratch directory was configured"
676                        );
677                    }
678                }
679
680                // Launch supervisor process.
681                let handle = mz_ore::task::spawn(
682                    || format!("process-orchestrator:{full_id}-{i}"),
683                    self.supervise_service_process(ServiceProcessConfig {
684                        id: id.to_string(),
685                        run_dir: run_dir.clone(),
686                        i,
687                        image: image.clone(),
688                        args,
689                        command_wrapper,
690                        ports,
691                        memory_limit,
692                        cpu_limit,
693                        launch_spec: self.config.launch_spec,
694                    }),
695                );
696
697                process_states.push(ProcessState {
698                    _handle: handle.abort_on_drop(),
699                    status: ProcessStatus::NotReady,
700                    status_time: Utc::now(),
701                    labels: labels.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
702                    tcp_proxy_addrs,
703                });
704            }
705
706            // Update the in-memory process state. We do this after we've created
707            // all process states to avoid partially updating our in-memory state.
708            services.insert(id, process_states);
709        }
710
711        self.maybe_write_prometheus_service_discovery_file().await;
712
713        Ok(())
714    }
715
716    async fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
717        let full_id = self.config.full_id(id);
718        let run_dir = self.config.service_run_dir(id);
719        let scratch_dir = self.config.service_scratch_dir(id);
720
721        // Drop the supervisor for the service, if it exists. If this service
722        // was under supervision, this will kill all processes associated with
723        // it.
724        {
725            let mut supervisors = self.services.lock().expect("lock poisoned");
726            supervisors.remove(id);
727        }
728
729        // If the service was orphaned by a prior incarnation of the
730        // orchestrator, it won't have been under supervision and therefore will
731        // still be running. So kill any process that we have state for in the
732        // run directory.
733        if let Ok(mut entries) = fs::read_dir(&run_dir).await {
734            while let Some(entry) = entries.next_entry().await? {
735                let path = entry.path();
736                if path.extension() == Some(OsStr::new("pid")) {
737                    let mut system = System::new();
738                    let Some(process) = find_process_from_pid_file(&mut system, &path).await else {
739                        continue;
740                    };
741                    let pid = process.pid();
742                    info!("terminating orphaned process for {full_id} with PID {pid}");
743                    process.kill();
744                }
745            }
746        }
747
748        // Clean up the on-disk state of the service.
749        if let Err(e) = remove_dir_all(run_dir).await {
750            if e.kind() != io::ErrorKind::NotFound {
751                warn!(
752                    "error cleaning up run directory for {full_id}: {}",
753                    e.display_with_causes()
754                );
755            }
756        }
757        if let Err(e) = remove_dir_all(scratch_dir).await {
758            if e.kind() != io::ErrorKind::NotFound {
759                warn!(
760                    "error cleaning up scratch directory for {full_id}: {}",
761                    e.display_with_causes()
762                );
763            }
764        }
765
766        self.maybe_write_prometheus_service_discovery_file().await;
767        Ok(())
768    }
769
770    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
771        let mut services = vec![];
772        let namespace_prefix = format!("{}-", self.config.namespace);
773        let mut entries = fs::read_dir(&self.config.metadata_dir).await?;
774        while let Some(entry) = entries.next_entry().await? {
775            let filename = entry
776                .file_name()
777                .into_string()
778                .map_err(|_| anyhow!("unable to convert filename to string"))?;
779            if let Some(id) = filename.strip_prefix(&namespace_prefix) {
780                services.push(id.to_string());
781            }
782        }
783        Ok(services)
784    }
785
786    fn supervise_service_process(
787        &self,
788        ServiceProcessConfig {
789            id,
790            run_dir,
791            i,
792            image,
793            args,
794            command_wrapper,
795            ports,
796            memory_limit,
797            cpu_limit,
798            launch_spec,
799        }: ServiceProcessConfig,
800    ) -> impl Future<Output = ()> + use<> {
801        let suppress_output = self.config.suppress_output;
802        let propagate_crashes = self.config.propagate_crashes;
803        let image = self.config.image_dir.join(image);
804        let pid_file = run_dir.join(format!("{i}.pid"));
805        let full_id = self.config.full_id(&id);
806
807        let state_updater = ProcessStateUpdater {
808            namespace: self.config.namespace.clone(),
809            id,
810            i,
811            services: Arc::clone(&self.services),
812            service_event_tx: self.service_event_tx.clone(),
813        };
814
815        async move {
816            let mut proxy_handles = vec![];
817            for port in ports {
818                if let Some(tcp_listener) = port.tcp_proxy_listener {
819                    info!(
820                        "{full_id}-{i}: {} tcp proxy listening on {}",
821                        port.name, tcp_listener.local_addr,
822                    );
823                    let uds_path = port.listen_addr;
824                    let handle = mz_ore::task::spawn(
825                        || format!("{full_id}-{i}-proxy-{}", port.name),
826                        tcp_proxy(TcpProxyConfig {
827                            name: format!("{full_id}-{i}-{}", port.name),
828                            tcp_listener,
829                            uds_path: uds_path.clone(),
830                        }),
831                    );
832                    proxy_handles.push(handle.abort_on_drop());
833                }
834            }
835
836            supervise_existing_process(&state_updater, &pid_file).await;
837
838            loop {
839                let mut cmd = launch_spec.refine_command(
840                    &image,
841                    &args,
842                    &command_wrapper,
843                    memory_limit.as_ref(),
844                    cpu_limit.as_ref(),
845                );
846                info!(
847                    "launching {full_id}-{i} via {} {}...",
848                    cmd.as_std().get_program().to_string_lossy(),
849                    cmd.as_std()
850                        .get_args()
851                        .map(|arg| arg.to_string_lossy())
852                        .join(" ")
853                );
854                if suppress_output {
855                    cmd.stdout(Stdio::null());
856                    cmd.stderr(Stdio::null());
857                }
858                match spawn_process(&state_updater, cmd, &pid_file, !command_wrapper.is_empty())
859                    .await
860                {
861                    Ok(status) => {
862                        if propagate_crashes && did_process_crash(status) {
863                            panic!(
864                                "{full_id}-{i} crashed; aborting because propagate_crashes is enabled"
865                            );
866                        }
867                        error!("{full_id}-{i} exited: {:?}; relaunching in 5s", status);
868                    }
869                    Err(e) => {
870                        error!("{full_id}-{i} failed to spawn: {}; relaunching in 5s", e);
871                    }
872                };
873                state_updater.update_state(ProcessStatus::NotReady);
874                time::sleep(Duration::from_secs(5)).await;
875            }
876        }
877    }
878
879    async fn maybe_write_prometheus_service_discovery_file(&self) {
880        #[derive(Serialize)]
881        struct StaticConfig {
882            labels: BTreeMap<String, String>,
883            targets: Vec<String>,
884        }
885
886        let Some(tcp_proxy) = &self.config.tcp_proxy else {
887            return;
888        };
889        let Some(dir) = &tcp_proxy.prometheus_service_discovery_dir else {
890            return;
891        };
892
893        let mut static_configs = vec![];
894        {
895            let services = self.services.lock().expect("lock poisoned");
896            for (id, states) in &*services {
897                for (i, state) in states.iter().enumerate() {
898                    for (name, addr) in &state.tcp_proxy_addrs {
899                        let mut labels = btreemap! {
900                            "mz_orchestrator_namespace".into() => self.config.namespace.clone(),
901                            "mz_orchestrator_service_id".into() => id.clone(),
902                            "mz_orchestrator_port".into() => name.clone(),
903                            "mz_orchestrator_ordinal".into() => i.to_string(),
904                        };
905                        for (k, v) in &state.labels {
906                            let k = format!("mz_orchestrator_{}", k.replace('-', "_"));
907                            labels.insert(k, v.clone());
908                        }
909                        static_configs.push(StaticConfig {
910                            labels,
911                            targets: vec![addr.to_string()],
912                        })
913                    }
914                }
915            }
916        }
917
918        let path = dir.join(Path::new(&self.config.namespace).with_extension("json"));
919        let contents = serde_json::to_vec_pretty(&static_configs).expect("valid json");
920        if let Err(e) = fs::write(&path, &contents).await {
921            warn!(
922                "{}: failed to write prometheus service discovery file: {}",
923                self.config.namespace,
924                e.display_with_causes()
925            );
926        }
927    }
928}
929
930struct ServiceProcessConfig {
931    id: String,
932    run_dir: PathBuf,
933    i: usize,
934    image: String,
935    args: Vec<String>,
936    command_wrapper: Vec<String>,
937    ports: Vec<ServiceProcessPort>,
938    memory_limit: Option<MemoryLimit>,
939    cpu_limit: Option<CpuLimit>,
940    launch_spec: LaunchSpec,
941}
942
943struct ServiceProcessPort {
944    name: String,
945    listen_addr: String,
946    tcp_proxy_listener: Option<AddressedTcpListener>,
947}
948
949/// Supervises an existing process, if it exists.
950async fn supervise_existing_process(state_updater: &ProcessStateUpdater, pid_file: &Path) {
951    let name = format!(
952        "{}-{}-{}",
953        state_updater.namespace, state_updater.id, state_updater.i
954    );
955
956    let mut system = System::new();
957    let Some(process) = find_process_from_pid_file(&mut system, pid_file).await else {
958        return;
959    };
960    let pid = process.pid();
961
962    info!(%pid, "discovered existing process for {name}");
963    state_updater.update_state(ProcessStatus::Ready { pid });
964
965    // Kill the process if the future is dropped.
966    let need_kill = AtomicBool::new(true);
967    defer! {
968        state_updater.update_state(ProcessStatus::NotReady);
969        if need_kill.load(Ordering::SeqCst) {
970            info!(%pid, "terminating existing process for {name}");
971            process.kill();
972        }
973    }
974
975    // Periodically check if the process has terminated.
976    let mut system = System::new();
977    while system.refresh_process_specifics(pid, ProcessRefreshKind::new()) {
978        time::sleep(Duration::from_secs(5)).await;
979    }
980
981    // The process has crashed. Exit the function without attempting to
982    // kill it.
983    warn!(%pid, "process for {name} has crashed; will reboot");
984    need_kill.store(false, Ordering::SeqCst)
985}
986
987fn interpolate_command(
988    command_part: &str,
989    full_id: &str,
990    ports: &BTreeMap<String, String>,
991) -> String {
992    let mut command_part = command_part.replace("%N", full_id);
993    for (endpoint, port) in ports {
994        command_part = command_part.replace(&format!("%P:{endpoint}"), port);
995    }
996    command_part
997}
998
999async fn spawn_process(
1000    state_updater: &ProcessStateUpdater,
1001    mut cmd: Command,
1002    pid_file: &Path,
1003    send_sigterm: bool,
1004) -> Result<ExitStatus, anyhow::Error> {
1005    struct KillOnDropChild(Child, bool);
1006
1007    impl Drop for KillOnDropChild {
1008        fn drop(&mut self) {
1009            if let (Some(pid), true) = (self.0.id().and_then(|id| i32::try_from(id).ok()), self.1) {
1010                let _ = nix::sys::signal::kill(
1011                    nix::unistd::Pid::from_raw(pid),
1012                    nix::sys::signal::Signal::SIGTERM,
1013                );
1014                // Give the process a bit of time to react to the signal
1015                tokio::task::block_in_place(|| std::thread::sleep(Duration::from_millis(500)));
1016            }
1017            let _ = self.0.start_kill();
1018        }
1019    }
1020
1021    let mut child = KillOnDropChild(cmd.spawn()?, send_sigterm);
1022
1023    // Immediately write out a file containing the PID of the child process and
1024    // its start time. We'll use this state to rediscover our children if we
1025    // crash and restart. There's a very small window where we can crash after
1026    // having spawned the child but before writing this file, in which case we
1027    // might orphan the process. We accept this risk, though. It's hard to do
1028    // anything more robust given the Unix APIs available to us, and the
1029    // solution here is good enough given that the process orchestrator is only
1030    // used in development/testing.
1031    let pid = Pid::from_u32(child.0.id().unwrap());
1032    write_pid_file(pid_file, pid).await?;
1033    state_updater.update_state(ProcessStatus::Ready { pid });
1034    Ok(child.0.wait().await?)
1035}
1036
1037fn did_process_crash(status: ExitStatus) -> bool {
1038    // Likely not exhaustive. Feel free to add additional tests for other
1039    // indications of a crashed child process, as those conditions are
1040    // discovered.
1041    matches!(
1042        status.signal(),
1043        Some(SIGABRT | SIGBUS | SIGSEGV | SIGTRAP | SIGILL)
1044    )
1045}
1046
1047async fn write_pid_file(pid_file: &Path, pid: Pid) -> Result<(), anyhow::Error> {
1048    let mut system = System::new();
1049    system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1050    let start_time = system.process(pid).map_or(0, |p| p.start_time());
1051    fs::write(pid_file, format!("{pid}\n{start_time}\n")).await?;
1052    Ok(())
1053}
1054
1055async fn find_process_from_pid_file<'a>(
1056    system: &'a mut System,
1057    pid_file: &Path,
1058) -> Option<&'a Process> {
1059    let Ok(contents) = fs::read_to_string(pid_file).await else {
1060        return None;
1061    };
1062    let lines = contents.trim().split('\n').collect::<Vec<_>>();
1063    let [pid, start_time] = lines.as_slice() else {
1064        return None;
1065    };
1066    let Ok(pid) = Pid::from_str(pid) else {
1067        return None;
1068    };
1069    let Ok(start_time) = u64::from_str(start_time) else {
1070        return None;
1071    };
1072    system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1073    let process = system.process(pid)?;
1074    // Checking the start time protects against killing an unrelated process due
1075    // to PID reuse.
1076    if process.start_time() != start_time {
1077        return None;
1078    }
1079    Some(process)
1080}
1081
1082struct TcpProxyConfig {
1083    name: String,
1084    tcp_listener: AddressedTcpListener,
1085    uds_path: String,
1086}
1087
1088async fn tcp_proxy(
1089    TcpProxyConfig {
1090        name,
1091        tcp_listener,
1092        uds_path,
1093    }: TcpProxyConfig,
1094) {
1095    let mut conns = FuturesUnordered::new();
1096    loop {
1097        select! {
1098            res = tcp_listener.listener.accept() => {
1099                debug!("{name}: accepting tcp proxy connection");
1100                let uds_path = uds_path.clone();
1101                conns.push(Box::pin(async move {
1102                    let (mut tcp_conn, _) = res.context("accepting tcp connection")?;
1103                    let mut uds_conn = UnixStream::connect(uds_path)
1104                        .await
1105                        .context("making uds connection")?;
1106                    io::copy_bidirectional(&mut tcp_conn, &mut uds_conn)
1107                        .await
1108                        .context("proxying")
1109                }));
1110            }
1111            Some(Err(e)) = conns.next() => {
1112                warn!("{name}: tcp proxy connection failed: {}", e.display_with_causes());
1113            }
1114        }
1115    }
1116}
1117
1118struct ProcessStateUpdater {
1119    namespace: String,
1120    id: String,
1121    i: usize,
1122    services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
1123    service_event_tx: broadcast::Sender<ServiceEvent>,
1124}
1125
1126impl ProcessStateUpdater {
1127    fn update_state(&self, status: ProcessStatus) {
1128        let mut services = self.services.lock().expect("lock poisoned");
1129        let Some(process_states) = services.get_mut(&self.id) else {
1130            return;
1131        };
1132        let Some(process_state) = process_states.get_mut(self.i) else {
1133            return;
1134        };
1135        let status_time = Utc::now();
1136        process_state.status = status;
1137        process_state.status_time = status_time;
1138        let _ = self.service_event_tx.send(ServiceEvent {
1139            service_id: self.id.to_string(),
1140            process_id: u64::cast_from(self.i),
1141            status: status.into(),
1142            time: status_time,
1143        });
1144    }
1145}
1146
1147#[derive(Debug)]
1148struct ProcessState {
1149    _handle: AbortOnDropHandle<()>,
1150    status: ProcessStatus,
1151    status_time: DateTime<Utc>,
1152    labels: BTreeMap<String, String>,
1153    tcp_proxy_addrs: BTreeMap<String, SocketAddr>,
1154}
1155
1156impl ProcessState {
1157    fn pid(&self) -> Option<Pid> {
1158        match &self.status {
1159            ProcessStatus::NotReady => None,
1160            ProcessStatus::Ready { pid } => Some(*pid),
1161        }
1162    }
1163}
1164
1165#[derive(Debug, Clone, Copy)]
1166enum ProcessStatus {
1167    NotReady,
1168    Ready { pid: Pid },
1169}
1170
1171impl From<ProcessStatus> for ServiceStatus {
1172    fn from(status: ProcessStatus) -> ServiceStatus {
1173        match status {
1174            ProcessStatus::NotReady => ServiceStatus::Offline(None),
1175            ProcessStatus::Ready { .. } => ServiceStatus::Online,
1176        }
1177    }
1178}
1179
1180fn socket_path(run_dir: &Path, port: &str, process: u16) -> String {
1181    let desired = run_dir
1182        .join(format!("{port}-{process}"))
1183        .to_string_lossy()
1184        .into_owned();
1185    if UnixSocketAddr::from_pathname(&desired).is_err() {
1186        // Unix socket addresses have a very low maximum length of around 100
1187        // bytes on most platforms.
1188        env::temp_dir()
1189            .join(hex::encode(Sha1::digest(desired)))
1190            .display()
1191            .to_string()
1192    } else {
1193        desired
1194    }
1195}
1196
1197struct AddressedTcpListener {
1198    listener: TcpListener,
1199    local_addr: SocketAddr,
1200}
1201
1202#[derive(Debug)]
1203struct ProcessService {
1204    id: String,
1205    run_dir: PathBuf,
1206    scale: NonZero<u16>,
1207    /// Shared reference to the services state, used to look up TCP proxy addresses.
1208    services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
1209}
1210
1211impl Service for ProcessService {
1212    fn addresses(&self, port: &str) -> Vec<String> {
1213        (0..self.scale.get())
1214            .map(|i| socket_path(&self.run_dir, port, i))
1215            .collect()
1216    }
1217
1218    fn tcp_addresses(&self, port: &str) -> Vec<String> {
1219        let guard = self.services.lock().expect("lock poisoned");
1220        let Some(states) = guard.get(&self.id) else {
1221            // Service not yet provisioned, return empty
1222            return Vec::new();
1223        };
1224
1225        states
1226            .iter()
1227            .filter_map(|state| state.tcp_proxy_addrs.get(port))
1228            .map(|addr| addr.to_string())
1229            .collect()
1230    }
1231}