orchestratord/
orchestratord.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::{
    future,
    net::SocketAddr,
    sync::{Arc, LazyLock},
};

use kube::runtime::watcher;
use tracing::info;

use mz_build_info::{build_info, BuildInfo};
use mz_orchestrator_kubernetes::util::create_client;
use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
use mz_orchestratord::{
    controller,
    k8s::register_crds,
    metrics::{self, Metrics},
};
use mz_ore::{
    cli::{self, CliConfig},
    error::ErrorExt,
    metrics::MetricsRegistry,
};

const BUILD_INFO: BuildInfo = build_info!();
static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));

#[derive(clap::Parser)]
#[clap(name = "orchestratord", version = VERSION.as_str())]
pub struct Args {
    #[structopt(long, env = "KUBERNETES_CONTEXT", default_value = "minikube")]
    kubernetes_context: String,

    #[clap(long, default_value = "[::]:8004")]
    profiling_listen_address: SocketAddr,
    #[clap(long, default_value = "[::]:3100")]
    metrics_listen_address: SocketAddr,

    #[clap(flatten)]
    materialize_controller_args: controller::materialize::MaterializeControllerArgs,

    #[clap(flatten)]
    tracing: TracingCliArgs,
}

#[tokio::main]
async fn main() {
    mz_ore::panic::install_enhanced_handler();

    let args = cli::parse_args(CliConfig {
        env_prefix: Some("ORCHESTRATORD_"),
        enable_version_flag: true,
    });
    if let Err(err) = run(args).await {
        panic!("orchestratord: fatal: {}", err.display_with_causes());
    }
}

async fn run(args: Args) -> Result<(), anyhow::Error> {
    let metrics_registry = MetricsRegistry::new();
    let (_, _tracing_guard) = args
        .tracing
        .configure_tracing(
            StaticTracingConfig {
                service_name: "orchestratord",
                build_info: BUILD_INFO,
            },
            metrics_registry.clone(),
        )
        .await?;

    let metrics = Arc::new(Metrics::register_into(&metrics_registry));

    let (client, namespace) = create_client(args.kubernetes_context.clone()).await?;
    register_crds(client.clone()).await?;

    {
        let router = mz_prof_http::router(&BUILD_INFO);
        let address = args.profiling_listen_address.clone();
        mz_ore::task::spawn(|| "profiling API internal web server", async move {
            if let Err(e) = axum::serve(
                tokio::net::TcpListener::bind(&address).await.unwrap(),
                router.into_make_service(),
            )
            .await
            {
                panic!(
                    "profiling API internal web server failed: {}",
                    e.display_with_causes()
                );
            }
        });
    }

    {
        let router = metrics::router(metrics_registry.clone());
        let address = args.metrics_listen_address.clone();
        mz_ore::task::spawn(|| "metrics server", async move {
            if let Err(e) = axum::serve(
                tokio::net::TcpListener::bind(&address).await.unwrap(),
                router.into_make_service(),
            )
            .await
            {
                panic!("metrics server failed: {}", e.display_with_causes());
            }
        });
    }

    mz_ore::task::spawn(
        || "materialize controller",
        k8s_controller::Controller::namespaced_all(
            client.clone(),
            mz_orchestratord::controller::materialize::Context::new(
                args.materialize_controller_args,
                args.tracing,
                namespace,
                Arc::clone(&metrics),
            ),
            watcher::Config::default().timeout(29),
        )
        .run(),
    );

    info!("All tasks started successfully.");

    future::pending().await
}