mz_orchestrator_process/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::env;
12use std::ffi::OsStr;
13use std::fmt::Debug;
14use std::fs::Permissions;
15use std::future::Future;
16use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener};
17use std::os::unix::fs::PermissionsExt;
18use std::os::unix::process::ExitStatusExt;
19use std::path::{Path, PathBuf};
20use std::process::{ExitStatus, Stdio};
21use std::str::FromStr;
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::{Arc, Mutex};
24
25use anyhow::{Context, anyhow, bail};
26use async_stream::stream;
27use async_trait::async_trait;
28use chrono::{DateTime, Utc};
29use futures::StreamExt;
30use futures::stream::{BoxStream, FuturesUnordered};
31use itertools::Itertools;
32use libc::{SIGABRT, SIGBUS, SIGILL, SIGSEGV, SIGTRAP};
33use maplit::btreemap;
34use mz_orchestrator::scheduling_config::ServiceSchedulingConfig;
35use mz_orchestrator::{
36    CpuLimit, DiskLimit, MemoryLimit, NamespacedOrchestrator, Orchestrator, Service, ServiceConfig,
37    ServiceEvent, ServicePort, ServiceProcessMetrics, ServiceStatus,
38};
39use mz_ore::cast::{CastFrom, TryCastFrom};
40use mz_ore::error::ErrorExt;
41use mz_ore::netio::UnixSocketAddr;
42use mz_ore::result::ResultExt;
43use mz_ore::task::AbortOnDropHandle;
44use scopeguard::defer;
45use serde::Serialize;
46use sha1::{Digest, Sha1};
47use sysinfo::{Pid, PidExt, Process, ProcessExt, ProcessRefreshKind, System, SystemExt};
48use tokio::fs::remove_dir_all;
49use tokio::net::{TcpListener, UnixStream};
50use tokio::process::{Child, Command};
51use tokio::sync::{broadcast, mpsc, oneshot};
52use tokio::time::{self, Duration};
53use tokio::{fs, io, select};
54use tracing::{debug, error, info, warn};
55
56pub mod secrets;
57
58/// Configures a [`ProcessOrchestrator`].
59#[derive(Debug, Clone)]
60pub struct ProcessOrchestratorConfig {
61    /// The directory in which the orchestrator should look for executable
62    /// images.
63    pub image_dir: PathBuf,
64    /// Whether to supress output from spawned subprocesses.
65    pub suppress_output: bool,
66    /// The ID of the environment under orchestration.
67    pub environment_id: String,
68    /// The directory in which to store secrets.
69    pub secrets_dir: PathBuf,
70    /// A command to wrap the child command invocation
71    pub command_wrapper: Vec<String>,
72    /// Whether to crash this process if a child process crashes.
73    pub propagate_crashes: bool,
74    /// TCP proxy configuration.
75    ///
76    /// When enabled, for each named port of each created service, the process
77    /// orchestrator will bind a TCP listener that proxies incoming connections
78    /// to the underlying Unix domain socket. Each bound TCP address will be
79    /// emitted as a tracing event.
80    ///
81    /// The primary use is live debugging the running child services via tools
82    /// that do not support Unix domain sockets (e.g., Prometheus, web
83    /// browsers).
84    pub tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
85    /// A scratch directory that orchestrated processes can use for ephemeral storage.
86    pub scratch_directory: PathBuf,
87}
88
89/// Configures the TCP proxy for a [`ProcessOrchestrator`].
90///
91/// See [`ProcessOrchestratorConfig::tcp_proxy`].
92#[derive(Debug, Clone)]
93pub struct ProcessOrchestratorTcpProxyConfig {
94    /// The IP address on which to bind TCP listeners.
95    pub listen_addr: IpAddr,
96    /// A directory in which to write Prometheus scrape targets, for use with
97    /// Prometheus's file-based service discovery.
98    ///
99    /// Each [`NamespacedOrchestrator`] will maintain a single JSON file into
100    /// the directory named `NAMESPACE.json` containing the scrape targets for
101    /// all extant services. The scrape targets will use the TCP proxy address,
102    /// as Prometheus does not support scraping over Unix domain sockets.
103    ///
104    /// See also: <https://prometheus.io/docs/guides/file-sd/>
105    pub prometheus_service_discovery_dir: Option<PathBuf>,
106}
107
108/// An orchestrator backed by processes on the local machine.
109///
110/// **This orchestrator is for development only.** Due to limitations in the
111/// Unix process API, it does not exactly conform to the documented semantics
112/// of `Orchestrator`.
113#[derive(Debug)]
114pub struct ProcessOrchestrator {
115    image_dir: PathBuf,
116    suppress_output: bool,
117    namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
118    metadata_dir: PathBuf,
119    secrets_dir: PathBuf,
120    command_wrapper: Vec<String>,
121    propagate_crashes: bool,
122    tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
123    scratch_directory: PathBuf,
124    launch_spec: LaunchSpec,
125}
126
127#[derive(Debug, Clone, Copy)]
128enum LaunchSpec {
129    /// Directly execute the provided binary
130    Direct,
131    /// Use Systemd to start the binary
132    Systemd,
133}
134
135impl LaunchSpec {
136    fn determine_implementation() -> Result<Self, anyhow::Error> {
137        // According to https://www.freedesktop.org/software/systemd/man/latest/sd_booted.html
138        // checking for `/run/systemd/system/` is the canonical way to determine if the system
139        // was booted up with systemd.
140        match Path::new("/run/systemd/system/").try_exists()? {
141            true => Ok(Self::Systemd),
142            false => Ok(Self::Direct),
143        }
144    }
145
146    fn refine_command(
147        &self,
148        image: impl AsRef<OsStr>,
149        args: &[impl AsRef<OsStr>],
150        wrapper: &[String],
151        full_id: &str,
152        listen_addrs: &BTreeMap<String, String>,
153        memory_limit: Option<&MemoryLimit>,
154        cpu_limit: Option<&CpuLimit>,
155    ) -> Command {
156        let wrapper_parts = || {
157            (
158                &wrapper[0],
159                wrapper[1..]
160                    .iter()
161                    .map(|part| interpolate_command(part, full_id, listen_addrs)),
162            )
163        };
164
165        let mut cmd = match self {
166            Self::Direct => {
167                if wrapper.is_empty() {
168                    Command::new(image)
169                } else {
170                    let (program, wrapper_args) = wrapper_parts();
171                    let mut cmd = Command::new(program);
172                    cmd.args(wrapper_args);
173                    cmd.arg(image);
174                    cmd
175                }
176            }
177            Self::Systemd => {
178                let mut cmd = Command::new("systemd-run");
179                cmd.args(["--user", "--scope", "--quiet"]);
180                if let Some(memory_limit) = memory_limit {
181                    let memory_limit = memory_limit.0.as_u64();
182                    cmd.args(["-p", &format!("MemoryMax={memory_limit}")]);
183                    // TODO: We could set `-p MemorySwapMax=0` here to disable regular swap.
184                }
185                if let Some(cpu_limit) = cpu_limit {
186                    let cpu_limit = (cpu_limit.as_millicpus() + 9) / 10;
187                    cmd.args(["-p", &format!("CPUQuota={cpu_limit}%")]);
188                }
189
190                if !wrapper.is_empty() {
191                    let (program, wrapper_args) = wrapper_parts();
192                    cmd.arg(program);
193                    cmd.args(wrapper_args);
194                };
195                cmd.arg(image);
196                cmd
197            }
198        };
199        cmd.args(args);
200        cmd
201    }
202}
203
204impl ProcessOrchestrator {
205    /// Creates a new process orchestrator from the provided configuration.
206    pub async fn new(
207        ProcessOrchestratorConfig {
208            image_dir,
209            suppress_output,
210            environment_id,
211            secrets_dir,
212            command_wrapper,
213            propagate_crashes,
214            tcp_proxy,
215            scratch_directory,
216        }: ProcessOrchestratorConfig,
217    ) -> Result<ProcessOrchestrator, anyhow::Error> {
218        let metadata_dir = env::temp_dir().join(format!("environmentd-{environment_id}"));
219        fs::create_dir_all(&metadata_dir)
220            .await
221            .context("creating metadata directory")?;
222        fs::create_dir_all(&secrets_dir)
223            .await
224            .context("creating secrets directory")?;
225        fs::set_permissions(&secrets_dir, Permissions::from_mode(0o700))
226            .await
227            .context("setting secrets directory permissions")?;
228        if let Some(prometheus_dir) = tcp_proxy
229            .as_ref()
230            .and_then(|p| p.prometheus_service_discovery_dir.as_ref())
231        {
232            fs::create_dir_all(&prometheus_dir)
233                .await
234                .context("creating prometheus directory")?;
235        }
236
237        let launch_spec = LaunchSpec::determine_implementation()?;
238        info!(driver = ?launch_spec, "Process orchestrator launch spec");
239
240        Ok(ProcessOrchestrator {
241            image_dir: fs::canonicalize(image_dir).await?,
242            suppress_output,
243            namespaces: Mutex::new(BTreeMap::new()),
244            metadata_dir: fs::canonicalize(metadata_dir).await?,
245            secrets_dir: fs::canonicalize(secrets_dir).await?,
246            command_wrapper,
247            propagate_crashes,
248            tcp_proxy,
249            scratch_directory,
250            launch_spec,
251        })
252    }
253}
254
255impl Orchestrator for ProcessOrchestrator {
256    fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
257        let mut namespaces = self.namespaces.lock().expect("lock poisoned");
258        Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
259            let config = Arc::new(NamespacedProcessOrchestratorConfig {
260                namespace: namespace.into(),
261                image_dir: self.image_dir.clone(),
262                suppress_output: self.suppress_output,
263                metadata_dir: self.metadata_dir.clone(),
264                command_wrapper: self.command_wrapper.clone(),
265                propagate_crashes: self.propagate_crashes,
266                tcp_proxy: self.tcp_proxy.clone(),
267                scratch_directory: self.scratch_directory.clone(),
268                launch_spec: self.launch_spec,
269            });
270
271            let services = Arc::new(Mutex::new(BTreeMap::new()));
272            let (service_event_tx, service_event_rx) = broadcast::channel(16384);
273            let (command_tx, command_rx) = mpsc::unbounded_channel();
274
275            let worker = OrchestratorWorker {
276                config: Arc::clone(&config),
277                services: Arc::clone(&services),
278                service_event_tx,
279                system: System::new(),
280                command_rx,
281            }
282            .spawn();
283
284            Arc::new(NamespacedProcessOrchestrator {
285                config,
286                services,
287                service_event_rx,
288                command_tx,
289                scheduling_config: Default::default(),
290                _worker: worker,
291            })
292        }))
293    }
294}
295
296/// Configuration for a [`NamespacedProcessOrchestrator`].
297#[derive(Debug)]
298struct NamespacedProcessOrchestratorConfig {
299    namespace: String,
300    image_dir: PathBuf,
301    suppress_output: bool,
302    metadata_dir: PathBuf,
303    command_wrapper: Vec<String>,
304    propagate_crashes: bool,
305    tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
306    scratch_directory: PathBuf,
307    launch_spec: LaunchSpec,
308}
309
310impl NamespacedProcessOrchestratorConfig {
311    fn full_id(&self, id: &str) -> String {
312        format!("{}-{}", self.namespace, id)
313    }
314
315    fn service_run_dir(&self, id: &str) -> PathBuf {
316        self.metadata_dir.join(&self.full_id(id))
317    }
318
319    fn service_scratch_dir(&self, id: &str) -> PathBuf {
320        self.scratch_directory.join(&self.full_id(id))
321    }
322}
323
324#[derive(Debug)]
325struct NamespacedProcessOrchestrator {
326    config: Arc<NamespacedProcessOrchestratorConfig>,
327    services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
328    service_event_rx: broadcast::Receiver<ServiceEvent>,
329    command_tx: mpsc::UnboundedSender<WorkerCommand>,
330    scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
331    _worker: AbortOnDropHandle<()>,
332}
333
334impl NamespacedProcessOrchestrator {
335    fn send_command(&self, cmd: WorkerCommand) {
336        self.command_tx.send(cmd).expect("worker task not dropped");
337    }
338}
339
340#[async_trait]
341impl NamespacedOrchestrator for NamespacedProcessOrchestrator {
342    fn ensure_service(
343        &self,
344        id: &str,
345        config: ServiceConfig,
346    ) -> Result<Box<dyn Service>, anyhow::Error> {
347        let always_use_disk = self
348            .scheduling_config
349            .read()
350            .expect("poisoned")
351            .always_use_disk;
352
353        let service = ProcessService {
354            run_dir: self.config.service_run_dir(id),
355            scale: config.scale,
356        };
357        // Determining whether to enable disk is subtle because we need to
358        // support historical sizes in the managed service and custom sizes in
359        // self hosted deployments.
360        let disk = {
361            // Whether the user specified `DISK = TRUE` when creating the
362            // replica OR whether the feature flag to force disk is enabled.
363            let user_requested_disk = config.disk || always_use_disk;
364            // Whether the cluster replica size map provided by the
365            // administrator explicitly indicates that the size does not support
366            // disk.
367            let size_disables_disk = config.disk_limit == Some(DiskLimit::ZERO);
368            // Enable disk if the user requested it and the size does not
369            // disable it.
370            //
371            // Arguably we should not allow the user to request disk with sizes
372            // that have a zero disk limit, but configuring disk on a replica by
373            // replica basis is a legacy option that we hope to remove someday.
374            user_requested_disk && !size_disables_disk
375        };
376
377        let config = EnsureServiceConfig {
378            image: config.image,
379            args: config.args,
380            ports: config.ports,
381            memory_limit: config.memory_limit,
382            cpu_limit: config.cpu_limit,
383            scale: config.scale,
384            labels: config.labels,
385            disk,
386        };
387
388        self.send_command(WorkerCommand::EnsureService {
389            id: id.to_string(),
390            config,
391        });
392
393        Ok(Box::new(service))
394    }
395
396    fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
397        self.send_command(WorkerCommand::DropService { id: id.to_string() });
398        Ok(())
399    }
400
401    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
402        let (result_tx, result_rx) = oneshot::channel();
403        self.send_command(WorkerCommand::ListServices { result_tx });
404
405        result_rx.await.expect("worker task not dropped")
406    }
407
408    fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
409        let mut initial_events = vec![];
410        let mut service_event_rx = {
411            let services = self.services.lock().expect("lock poisoned");
412            for (service_id, process_states) in &*services {
413                for (process_id, process_state) in process_states.iter().enumerate() {
414                    initial_events.push(ServiceEvent {
415                        service_id: service_id.clone(),
416                        process_id: u64::cast_from(process_id),
417                        status: process_state.status.into(),
418                        time: process_state.status_time,
419                    });
420                }
421            }
422            self.service_event_rx.resubscribe()
423        };
424        Box::pin(stream! {
425            for event in initial_events {
426                yield Ok(event);
427            }
428            loop {
429                yield service_event_rx.recv().await.err_into();
430            }
431        })
432    }
433
434    async fn fetch_service_metrics(
435        &self,
436        id: &str,
437    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
438        let (result_tx, result_rx) = oneshot::channel();
439        self.send_command(WorkerCommand::FetchServiceMetrics {
440            id: id.to_string(),
441            result_tx,
442        });
443
444        result_rx.await.expect("worker task not dropped")
445    }
446
447    fn update_scheduling_config(
448        &self,
449        config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
450    ) {
451        *self.scheduling_config.write().expect("poisoned") = config;
452    }
453}
454
455/// Commands sent from a [`NamespacedProcessOrchestrator`] to its
456/// [`OrchestratorWorker`].
457///
458/// Commands for which the caller expects a result include a `result_tx` on which the
459/// [`OrchestratorWorker`] will deliver the result.
460enum WorkerCommand {
461    EnsureService {
462        id: String,
463        config: EnsureServiceConfig,
464    },
465    DropService {
466        id: String,
467    },
468    ListServices {
469        result_tx: oneshot::Sender<Result<Vec<String>, anyhow::Error>>,
470    },
471    FetchServiceMetrics {
472        id: String,
473        result_tx: oneshot::Sender<Result<Vec<ServiceProcessMetrics>, anyhow::Error>>,
474    },
475}
476
477/// Describes the desired state of a process.
478struct EnsureServiceConfig {
479    /// An opaque identifier for the executable or container image to run.
480    ///
481    /// Often names a container on Docker Hub or a path on the local machine.
482    pub image: String,
483    /// A function that generates the arguments for each process of the service
484    /// given the assigned listen addresses for each named port.
485    pub args: Box<dyn Fn(&BTreeMap<String, String>) -> Vec<String> + Send + Sync>,
486    /// Ports to expose.
487    pub ports: Vec<ServicePort>,
488    /// An optional limit on the memory that the service can use.
489    pub memory_limit: Option<MemoryLimit>,
490    /// An optional limit on the CPU that the service can use.
491    pub cpu_limit: Option<CpuLimit>,
492    /// The number of copies of this service to run.
493    pub scale: u16,
494    /// Arbitrary key–value pairs to attach to the service in the orchestrator
495    /// backend.
496    ///
497    /// The orchestrator backend may apply a prefix to the key if appropriate.
498    pub labels: BTreeMap<String, String>,
499    /// Whether scratch disk space should be allocated for the service.
500    pub disk: bool,
501}
502
503/// A task executing blocking work for a [`NamespacedProcessOrchestrator`] in the background.
504///
505/// This type exists to enable making [`NamespacedProcessOrchestrator::ensure_service`] and
506/// [`NamespacedProcessOrchestrator::drop_service`] non-blocking, allowing invocation of these
507/// methods in latency-sensitive contexts.
508///
509/// Note that, apart from `ensure_service` and `drop_service`, this worker also handles blocking
510/// orchestrator calls that query service state (such as `list_services`). These need to be
511/// sequenced through the worker loop to ensure they linearize as expected. For example, we want to
512/// ensure that a `list_services` result contains exactly those services that were previously
513/// created with `ensure_service` and not yet dropped with `drop_service`.
514struct OrchestratorWorker {
515    config: Arc<NamespacedProcessOrchestratorConfig>,
516    services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
517    service_event_tx: broadcast::Sender<ServiceEvent>,
518    system: System,
519    command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
520}
521
522impl OrchestratorWorker {
523    fn spawn(self) -> AbortOnDropHandle<()> {
524        let name = format!("process-orchestrator:{}", self.config.namespace);
525        mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
526    }
527
528    async fn run(mut self) {
529        while let Some(cmd) = self.command_rx.recv().await {
530            use WorkerCommand::*;
531            let result = match cmd {
532                EnsureService { id, config } => self.ensure_service(id, config).await,
533                DropService { id } => self.drop_service(&id).await,
534                ListServices { result_tx } => {
535                    let _ = result_tx.send(self.list_services().await);
536                    Ok(())
537                }
538                FetchServiceMetrics { id, result_tx } => {
539                    let _ = result_tx.send(self.fetch_service_metrics(&id));
540                    Ok(())
541                }
542            };
543
544            if let Err(error) = result {
545                panic!("process orchestrator worker failed: {error}");
546            }
547        }
548    }
549
550    fn fetch_service_metrics(
551        &mut self,
552        id: &str,
553    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
554        let pids: Vec<_> = {
555            let services = self.services.lock().expect("lock poisoned");
556            let Some(service) = services.get(id) else {
557                bail!("unknown service {id}")
558            };
559            service.iter().map(|p| p.pid()).collect()
560        };
561
562        let mut metrics = vec![];
563        for pid in pids {
564            let (cpu_nano_cores, memory_bytes) = match pid {
565                None => (None, None),
566                Some(pid) => {
567                    self.system
568                        .refresh_process_specifics(pid, ProcessRefreshKind::new().with_cpu());
569                    match self.system.process(pid) {
570                        None => (None, None),
571                        Some(process) => {
572                            // Justification for `unwrap`:
573                            //
574                            // `u64::try_cast_from(f: f64)`
575                            // will always succeed if 0 <= f < 2^64.
576                            // Since the max value of `process.cpu_usage()` is
577                            // 100.0 * num_of_cores, this will be true whenever there
578                            // are less than 2^64 / 10^9 logical cores, or about
579                            // 18 billion.
580                            let cpu = u64::try_cast_from(
581                                (f64::from(process.cpu_usage()) * 10_000_000.0).trunc(),
582                            )
583                            .expect("sane value of process.cpu_usage()");
584                            let memory = process.memory();
585                            (Some(cpu), Some(memory))
586                        }
587                    }
588                }
589            };
590            metrics.push(ServiceProcessMetrics {
591                cpu_nano_cores,
592                memory_bytes,
593                // Process orchestrator does not support this right now.
594                disk_usage_bytes: None,
595            });
596        }
597        Ok(metrics)
598    }
599
600    async fn ensure_service(
601        &self,
602        id: String,
603        EnsureServiceConfig {
604            image,
605            args,
606            ports: ports_in,
607            memory_limit,
608            cpu_limit,
609            scale,
610            labels,
611            disk,
612        }: EnsureServiceConfig,
613    ) -> Result<(), anyhow::Error> {
614        let full_id = self.config.full_id(&id);
615
616        let run_dir = self.config.service_run_dir(&id);
617        fs::create_dir_all(&run_dir)
618            .await
619            .context("creating run directory")?;
620        let scratch_dir = if disk {
621            let scratch_dir = self.config.service_scratch_dir(&id);
622            fs::create_dir_all(&scratch_dir)
623                .await
624                .context("creating scratch directory")?;
625            Some(fs::canonicalize(&scratch_dir).await?)
626        } else {
627            None
628        };
629
630        {
631            let mut services = self.services.lock().expect("lock poisoned");
632            let process_states = services.entry(id.clone()).or_default();
633
634            // Create the state for new processes.
635            let mut new_process_states = vec![];
636            for i in process_states.len()..scale.into() {
637                // Allocate listeners for each TCP proxy, if requested.
638                let mut ports = vec![];
639                let mut tcp_proxy_addrs = BTreeMap::new();
640                for port in &ports_in {
641                    let tcp_proxy_listener = match &self.config.tcp_proxy {
642                        None => None,
643                        Some(tcp_proxy) => {
644                            let listener = StdTcpListener::bind((tcp_proxy.listen_addr, 0))
645                                .with_context(|| format!("binding to {}", tcp_proxy.listen_addr))?;
646                            listener.set_nonblocking(true)?;
647                            let listener = TcpListener::from_std(listener)?;
648                            let local_addr = listener.local_addr()?;
649                            tcp_proxy_addrs.insert(port.name.clone(), local_addr);
650                            Some(AddressedTcpListener {
651                                listener,
652                                local_addr,
653                            })
654                        }
655                    };
656                    ports.push(ServiceProcessPort {
657                        name: port.name.clone(),
658                        tcp_proxy_listener,
659                    });
660                }
661
662                // Launch supervisor process.
663                let handle = mz_ore::task::spawn(
664                    || format!("process-orchestrator:{full_id}-{i}"),
665                    self.supervise_service_process(ServiceProcessConfig {
666                        id: id.to_string(),
667                        run_dir: run_dir.clone(),
668                        scratch_dir: scratch_dir.clone(),
669                        i,
670                        image: image.clone(),
671                        args: &args,
672                        ports,
673                        memory_limit,
674                        cpu_limit,
675                        disk,
676                        launch_spec: self.config.launch_spec,
677                    }),
678                );
679
680                new_process_states.push(ProcessState {
681                    _handle: handle.abort_on_drop(),
682                    status: ProcessStatus::NotReady,
683                    status_time: Utc::now(),
684                    labels: labels.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
685                    tcp_proxy_addrs,
686                });
687            }
688
689            // Update the in-memory process state. We do this after we've created
690            // all process states to avoid partially updating our in-memory state.
691            process_states.truncate(scale.into());
692            process_states.extend(new_process_states);
693        }
694
695        self.maybe_write_prometheus_service_discovery_file().await;
696
697        Ok(())
698    }
699
700    async fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
701        let full_id = self.config.full_id(id);
702        let run_dir = self.config.service_run_dir(id);
703        let scratch_dir = self.config.service_scratch_dir(id);
704
705        // Drop the supervisor for the service, if it exists. If this service
706        // was under supervision, this will kill all processes associated with
707        // it.
708        {
709            let mut supervisors = self.services.lock().expect("lock poisoned");
710            supervisors.remove(id);
711        }
712
713        // If the service was orphaned by a prior incarnation of the
714        // orchestrator, it won't have been under supervision and therefore will
715        // still be running. So kill any process that we have state for in the
716        // run directory.
717        if let Ok(mut entries) = fs::read_dir(&run_dir).await {
718            while let Some(entry) = entries.next_entry().await? {
719                let path = entry.path();
720                if path.extension() == Some(OsStr::new("pid")) {
721                    let mut system = System::new();
722                    let Some(process) = find_process_from_pid_file(&mut system, &path).await else {
723                        continue;
724                    };
725                    let pid = process.pid();
726                    info!("terminating orphaned process for {full_id} with PID {pid}");
727                    process.kill();
728                }
729            }
730        }
731
732        // Clean up the on-disk state of the service.
733        if let Err(e) = remove_dir_all(run_dir).await {
734            if e.kind() != io::ErrorKind::NotFound {
735                warn!(
736                    "error cleaning up run directory for {full_id}: {}",
737                    e.display_with_causes()
738                );
739            }
740        }
741        if let Err(e) = remove_dir_all(scratch_dir).await {
742            if e.kind() != io::ErrorKind::NotFound {
743                warn!(
744                    "error cleaning up scratch directory for {full_id}: {}",
745                    e.display_with_causes()
746                );
747            }
748        }
749
750        self.maybe_write_prometheus_service_discovery_file().await;
751        Ok(())
752    }
753
754    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
755        let mut services = vec![];
756        let namespace_prefix = format!("{}-", self.config.namespace);
757        let mut entries = fs::read_dir(&self.config.metadata_dir).await?;
758        while let Some(entry) = entries.next_entry().await? {
759            let filename = entry
760                .file_name()
761                .into_string()
762                .map_err(|_| anyhow!("unable to convert filename to string"))?;
763            if let Some(id) = filename.strip_prefix(&namespace_prefix) {
764                services.push(id.to_string());
765            }
766        }
767        Ok(services)
768    }
769
770    fn supervise_service_process(
771        &self,
772        ServiceProcessConfig {
773            id,
774            run_dir,
775            scratch_dir,
776            i,
777            image,
778            args,
779            ports,
780            memory_limit,
781            cpu_limit,
782            disk,
783            launch_spec,
784        }: ServiceProcessConfig,
785    ) -> impl Future<Output = ()> + use<> {
786        let suppress_output = self.config.suppress_output;
787        let propagate_crashes = self.config.propagate_crashes;
788        let command_wrapper = self.config.command_wrapper.clone();
789        let image = self.config.image_dir.join(image);
790        let pid_file = run_dir.join(format!("{i}.pid"));
791        let full_id = self.config.full_id(&id);
792
793        let state_updater = ProcessStateUpdater {
794            namespace: self.config.namespace.clone(),
795            id,
796            i,
797            services: Arc::clone(&self.services),
798            service_event_tx: self.service_event_tx.clone(),
799        };
800
801        let listen_addrs = ports
802            .iter()
803            .map(|p| {
804                let addr = socket_path(&run_dir, &p.name, i);
805                (p.name.clone(), addr)
806            })
807            .collect();
808        let mut args = args(&listen_addrs);
809
810        if disk {
811            if let Some(scratch) = &scratch_dir {
812                args.push(format!("--scratch-directory={}", scratch.display()));
813            } else {
814                panic!(
815                    "internal error: service requested disk but no scratch directory was configured"
816                );
817            }
818        }
819
820        async move {
821            let mut proxy_handles = vec![];
822            for port in ports {
823                if let Some(tcp_listener) = port.tcp_proxy_listener {
824                    info!(
825                        "{full_id}-{i}: {} tcp proxy listening on {}",
826                        port.name, tcp_listener.local_addr,
827                    );
828                    let uds_path = &listen_addrs[&port.name];
829                    let handle = mz_ore::task::spawn(
830                        || format!("{full_id}-{i}-proxy-{}", port.name),
831                        tcp_proxy(TcpProxyConfig {
832                            name: format!("{full_id}-{i}-{}", port.name),
833                            tcp_listener,
834                            uds_path: uds_path.clone(),
835                        }),
836                    );
837                    proxy_handles.push(handle.abort_on_drop());
838                }
839            }
840
841            supervise_existing_process(&state_updater, &pid_file).await;
842
843            loop {
844                let mut cmd = launch_spec.refine_command(
845                    &image,
846                    &args,
847                    &command_wrapper,
848                    &full_id,
849                    &listen_addrs,
850                    memory_limit.as_ref(),
851                    cpu_limit.as_ref(),
852                );
853                info!(
854                    "launching {full_id}-{i} via {} {}...",
855                    cmd.as_std().get_program().to_string_lossy(),
856                    cmd.as_std()
857                        .get_args()
858                        .map(|arg| arg.to_string_lossy())
859                        .join(" ")
860                );
861                if suppress_output {
862                    cmd.stdout(Stdio::null());
863                    cmd.stderr(Stdio::null());
864                }
865                match spawn_process(&state_updater, cmd, &pid_file, !command_wrapper.is_empty())
866                    .await
867                {
868                    Ok(status) => {
869                        if propagate_crashes && did_process_crash(status) {
870                            panic!(
871                                "{full_id}-{i} crashed; aborting because propagate_crashes is enabled"
872                            );
873                        }
874                        error!("{full_id}-{i} exited: {:?}; relaunching in 5s", status);
875                    }
876                    Err(e) => {
877                        error!("{full_id}-{i} failed to spawn: {}; relaunching in 5s", e);
878                    }
879                };
880                state_updater.update_state(ProcessStatus::NotReady);
881                time::sleep(Duration::from_secs(5)).await;
882            }
883        }
884    }
885
886    async fn maybe_write_prometheus_service_discovery_file(&self) {
887        #[derive(Serialize)]
888        struct StaticConfig {
889            labels: BTreeMap<String, String>,
890            targets: Vec<String>,
891        }
892
893        let Some(tcp_proxy) = &self.config.tcp_proxy else {
894            return;
895        };
896        let Some(dir) = &tcp_proxy.prometheus_service_discovery_dir else {
897            return;
898        };
899
900        let mut static_configs = vec![];
901        {
902            let services = self.services.lock().expect("lock poisoned");
903            for (id, states) in &*services {
904                for (i, state) in states.iter().enumerate() {
905                    for (name, addr) in &state.tcp_proxy_addrs {
906                        let mut labels = btreemap! {
907                            "mz_orchestrator_namespace".into() => self.config.namespace.clone(),
908                            "mz_orchestrator_service_id".into() => id.clone(),
909                            "mz_orchestrator_port".into() => name.clone(),
910                            "mz_orchestrator_ordinal".into() => i.to_string(),
911                        };
912                        for (k, v) in &state.labels {
913                            let k = format!("mz_orchestrator_{}", k.replace('-', "_"));
914                            labels.insert(k, v.clone());
915                        }
916                        static_configs.push(StaticConfig {
917                            labels,
918                            targets: vec![addr.to_string()],
919                        })
920                    }
921                }
922            }
923        }
924
925        let path = dir.join(Path::new(&self.config.namespace).with_extension("json"));
926        let contents = serde_json::to_vec_pretty(&static_configs).expect("valid json");
927        if let Err(e) = fs::write(&path, &contents).await {
928            warn!(
929                "{}: failed to write prometheus service discovery file: {}",
930                self.config.namespace,
931                e.display_with_causes()
932            );
933        }
934    }
935}
936
937struct ServiceProcessConfig<'a> {
938    id: String,
939    run_dir: PathBuf,
940    scratch_dir: Option<PathBuf>,
941    i: usize,
942    image: String,
943    args: &'a (dyn Fn(&BTreeMap<String, String>) -> Vec<String> + Send + Sync),
944    ports: Vec<ServiceProcessPort>,
945    disk: bool,
946    memory_limit: Option<MemoryLimit>,
947    cpu_limit: Option<CpuLimit>,
948    launch_spec: LaunchSpec,
949}
950
951struct ServiceProcessPort {
952    name: String,
953    tcp_proxy_listener: Option<AddressedTcpListener>,
954}
955
956/// Supervises an existing process, if it exists.
957async fn supervise_existing_process(state_updater: &ProcessStateUpdater, pid_file: &Path) {
958    let name = format!(
959        "{}-{}-{}",
960        state_updater.namespace, state_updater.id, state_updater.i
961    );
962
963    let mut system = System::new();
964    let Some(process) = find_process_from_pid_file(&mut system, pid_file).await else {
965        return;
966    };
967    let pid = process.pid();
968
969    info!(%pid, "discovered existing process for {name}");
970    state_updater.update_state(ProcessStatus::Ready { pid });
971
972    // Kill the process if the future is dropped.
973    let need_kill = AtomicBool::new(true);
974    defer! {
975        state_updater.update_state(ProcessStatus::NotReady);
976        if need_kill.load(Ordering::SeqCst) {
977            info!(%pid, "terminating existing process for {name}");
978            process.kill();
979        }
980    }
981
982    // Periodically check if the process has terminated.
983    let mut system = System::new();
984    while system.refresh_process_specifics(pid, ProcessRefreshKind::new()) {
985        time::sleep(Duration::from_secs(5)).await;
986    }
987
988    // The process has crashed. Exit the function without attempting to
989    // kill it.
990    warn!(%pid, "process for {name} has crashed; will reboot");
991    need_kill.store(false, Ordering::SeqCst)
992}
993
994fn interpolate_command(
995    command_part: &str,
996    full_id: &str,
997    ports: &BTreeMap<String, String>,
998) -> String {
999    let mut command_part = command_part.replace("%N", full_id);
1000    for (endpoint, port) in ports {
1001        command_part = command_part.replace(&format!("%P:{endpoint}"), port);
1002    }
1003    command_part
1004}
1005
1006async fn spawn_process(
1007    state_updater: &ProcessStateUpdater,
1008    mut cmd: Command,
1009    pid_file: &Path,
1010    send_sigterm: bool,
1011) -> Result<ExitStatus, anyhow::Error> {
1012    struct KillOnDropChild(Child, bool);
1013
1014    impl Drop for KillOnDropChild {
1015        fn drop(&mut self) {
1016            if let (Some(pid), true) = (self.0.id().and_then(|id| i32::try_from(id).ok()), self.1) {
1017                let _ = nix::sys::signal::kill(
1018                    nix::unistd::Pid::from_raw(pid),
1019                    nix::sys::signal::Signal::SIGTERM,
1020                );
1021                // Give the process a bit of time to react to the signal
1022                tokio::task::block_in_place(|| std::thread::sleep(Duration::from_millis(500)));
1023            }
1024            let _ = self.0.start_kill();
1025        }
1026    }
1027
1028    let mut child = KillOnDropChild(cmd.spawn()?, send_sigterm);
1029
1030    // Immediately write out a file containing the PID of the child process and
1031    // its start time. We'll use this state to rediscover our children if we
1032    // crash and restart. There's a very small window where we can crash after
1033    // having spawned the child but before writing this file, in which case we
1034    // might orphan the process. We accept this risk, though. It's hard to do
1035    // anything more robust given the Unix APIs available to us, and the
1036    // solution here is good enough given that the process orchestrator is only
1037    // used in development/testing.
1038    let pid = Pid::from_u32(child.0.id().unwrap());
1039    write_pid_file(pid_file, pid).await?;
1040    state_updater.update_state(ProcessStatus::Ready { pid });
1041    Ok(child.0.wait().await?)
1042}
1043
1044fn did_process_crash(status: ExitStatus) -> bool {
1045    // Likely not exhaustive. Feel free to add additional tests for other
1046    // indications of a crashed child process, as those conditions are
1047    // discovered.
1048    matches!(
1049        status.signal(),
1050        Some(SIGABRT | SIGBUS | SIGSEGV | SIGTRAP | SIGILL)
1051    )
1052}
1053
1054async fn write_pid_file(pid_file: &Path, pid: Pid) -> Result<(), anyhow::Error> {
1055    let mut system = System::new();
1056    system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1057    let start_time = system.process(pid).map_or(0, |p| p.start_time());
1058    fs::write(pid_file, format!("{pid}\n{start_time}\n")).await?;
1059    Ok(())
1060}
1061
1062async fn find_process_from_pid_file<'a>(
1063    system: &'a mut System,
1064    pid_file: &Path,
1065) -> Option<&'a Process> {
1066    let Ok(contents) = fs::read_to_string(pid_file).await else {
1067        return None;
1068    };
1069    let lines = contents.trim().split('\n').collect::<Vec<_>>();
1070    let [pid, start_time] = lines.as_slice() else {
1071        return None;
1072    };
1073    let Ok(pid) = Pid::from_str(pid) else {
1074        return None;
1075    };
1076    let Ok(start_time) = u64::from_str(start_time) else {
1077        return None;
1078    };
1079    system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1080    let process = system.process(pid)?;
1081    // Checking the start time protects against killing an unrelated process due
1082    // to PID reuse.
1083    if process.start_time() != start_time {
1084        return None;
1085    }
1086    Some(process)
1087}
1088
1089struct TcpProxyConfig {
1090    name: String,
1091    tcp_listener: AddressedTcpListener,
1092    uds_path: String,
1093}
1094
1095async fn tcp_proxy(
1096    TcpProxyConfig {
1097        name,
1098        tcp_listener,
1099        uds_path,
1100    }: TcpProxyConfig,
1101) {
1102    let mut conns = FuturesUnordered::new();
1103    loop {
1104        select! {
1105            res = tcp_listener.listener.accept() => {
1106                debug!("{name}: accepting tcp proxy connection");
1107                let uds_path = uds_path.clone();
1108                conns.push(Box::pin(async move {
1109                    let (mut tcp_conn, _) = res.context("accepting tcp connection")?;
1110                    let mut uds_conn = UnixStream::connect(uds_path)
1111                        .await
1112                        .context("making uds connection")?;
1113                    io::copy_bidirectional(&mut tcp_conn, &mut uds_conn)
1114                        .await
1115                        .context("proxying")
1116                }));
1117            }
1118            Some(Err(e)) = conns.next() => {
1119                warn!("{name}: tcp proxy connection failed: {}", e.display_with_causes());
1120            }
1121        }
1122    }
1123}
1124
1125struct ProcessStateUpdater {
1126    namespace: String,
1127    id: String,
1128    i: usize,
1129    services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
1130    service_event_tx: broadcast::Sender<ServiceEvent>,
1131}
1132
1133impl ProcessStateUpdater {
1134    fn update_state(&self, status: ProcessStatus) {
1135        let mut services = self.services.lock().expect("lock poisoned");
1136        let Some(process_states) = services.get_mut(&self.id) else {
1137            return;
1138        };
1139        let Some(process_state) = process_states.get_mut(self.i) else {
1140            return;
1141        };
1142        let status_time = Utc::now();
1143        process_state.status = status;
1144        process_state.status_time = status_time;
1145        let _ = self.service_event_tx.send(ServiceEvent {
1146            service_id: self.id.to_string(),
1147            process_id: u64::cast_from(self.i),
1148            status: status.into(),
1149            time: status_time,
1150        });
1151    }
1152}
1153
1154#[derive(Debug)]
1155struct ProcessState {
1156    _handle: AbortOnDropHandle<()>,
1157    status: ProcessStatus,
1158    status_time: DateTime<Utc>,
1159    labels: BTreeMap<String, String>,
1160    tcp_proxy_addrs: BTreeMap<String, SocketAddr>,
1161}
1162
1163impl ProcessState {
1164    fn pid(&self) -> Option<Pid> {
1165        match &self.status {
1166            ProcessStatus::NotReady => None,
1167            ProcessStatus::Ready { pid } => Some(*pid),
1168        }
1169    }
1170}
1171
1172#[derive(Debug, Clone, Copy)]
1173enum ProcessStatus {
1174    NotReady,
1175    Ready { pid: Pid },
1176}
1177
1178impl From<ProcessStatus> for ServiceStatus {
1179    fn from(status: ProcessStatus) -> ServiceStatus {
1180        match status {
1181            ProcessStatus::NotReady => ServiceStatus::Offline(None),
1182            ProcessStatus::Ready { .. } => ServiceStatus::Online,
1183        }
1184    }
1185}
1186
1187fn socket_path(run_dir: &Path, port: &str, process: usize) -> String {
1188    let desired = run_dir
1189        .join(format!("{port}-{process}"))
1190        .to_string_lossy()
1191        .into_owned();
1192    if UnixSocketAddr::from_pathname(&desired).is_err() {
1193        // Unix socket addresses have a very low maximum length of around 100
1194        // bytes on most platforms.
1195        env::temp_dir()
1196            .join(hex::encode(Sha1::digest(desired)))
1197            .display()
1198            .to_string()
1199    } else {
1200        desired
1201    }
1202}
1203
1204struct AddressedTcpListener {
1205    listener: TcpListener,
1206    local_addr: SocketAddr,
1207}
1208
1209#[derive(Debug, Clone)]
1210struct ProcessService {
1211    run_dir: PathBuf,
1212    scale: u16,
1213}
1214
1215impl Service for ProcessService {
1216    fn addresses(&self, port: &str) -> Vec<String> {
1217        (0..self.scale)
1218            .map(|i| socket_path(&self.run_dir, port, i.into()))
1219            .collect()
1220    }
1221}