use k8s_openapi::{
api::{
apps::v1::{Deployment, DeploymentSpec},
core::v1::{
Capabilities, Container, ContainerPort, EnvVar, HTTPGetAction, PodSpec,
PodTemplateSpec, Probe, SeccompProfile, SecurityContext, Service, ServicePort,
ServiceSpec,
},
networking::v1::{
IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
NetworkPolicySpec,
},
},
apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
};
use kube::{api::ObjectMeta, Api, Client};
use maplit::btreemap;
use crate::k8s::apply_resource;
use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
const CONSOLE_IMAGE_HTTP_PORT: i32 = 8080;
pub struct Resources {
network_policies: Vec<NetworkPolicy>,
console_deployment: Box<Deployment>,
console_service: Box<Service>,
}
impl Resources {
pub fn new(config: &super::Args, mz: &Materialize, console_image_ref: &str) -> Self {
let network_policies = create_network_policies(config, mz);
let console_deployment = Box::new(create_console_deployment_object(
config,
mz,
console_image_ref,
));
let console_service = Box::new(create_console_service_object(config, mz));
Self {
network_policies,
console_deployment,
console_service,
}
}
pub async fn apply(&self, client: &Client, namespace: &str) -> Result<(), anyhow::Error> {
let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), namespace);
let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
for network_policy in &self.network_policies {
apply_resource(&network_policy_api, network_policy).await?;
}
apply_resource(&deployment_api, &self.console_deployment).await?;
apply_resource(&service_api, &self.console_service).await?;
Ok(())
}
}
fn create_network_policies(config: &super::Args, mz: &Materialize) -> Vec<NetworkPolicy> {
let mut network_policies = Vec::new();
if config.network_policies.ingress_enabled {
let console_label_selector = LabelSelector {
match_labels: Some(
mz.default_labels()
.into_iter()
.chain([("materialize.cloud/app".to_owned(), mz.console_app_name())])
.collect(),
),
..Default::default()
};
network_policies.extend([NetworkPolicy {
metadata: mz.managed_resource_meta(mz.name_prefixed("console-ingress")),
spec: Some(NetworkPolicySpec {
ingress: Some(vec![NetworkPolicyIngressRule {
from: Some(
config
.network_policies
.ingress_cidrs
.iter()
.map(|cidr| NetworkPolicyPeer {
ip_block: Some(IPBlock {
cidr: cidr.to_owned(),
except: None,
}),
..Default::default()
})
.collect(),
),
ports: Some(vec![NetworkPolicyPort {
port: Some(IntOrString::Int(CONSOLE_IMAGE_HTTP_PORT)),
protocol: Some("TCP".to_string()),
..Default::default()
}]),
..Default::default()
}]),
pod_selector: console_label_selector,
policy_types: Some(vec!["Ingress".to_owned()]),
..Default::default()
}),
}]);
}
network_policies
}
fn create_console_deployment_object(
config: &super::Args,
mz: &Materialize,
console_image_ref: &str,
) -> Deployment {
let mut pod_template_labels = mz.default_labels();
pod_template_labels.insert(
"materialize.cloud/name".to_owned(),
mz.console_deployment_name(),
);
pod_template_labels.insert("app".to_owned(), "console".to_string());
pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.console_app_name());
let ports = vec![ContainerPort {
container_port: CONSOLE_IMAGE_HTTP_PORT,
name: Some("http".into()),
protocol: Some("TCP".into()),
..Default::default()
}];
let probe = Probe {
http_get: Some(HTTPGetAction {
path: Some("/".to_string()),
port: IntOrString::Int(CONSOLE_IMAGE_HTTP_PORT),
..Default::default()
}),
..Default::default()
};
let env = vec![EnvVar {
name: "MZ_ENDPOINT".to_string(),
value: Some(format!(
"http://{}.{}.svc.cluster.local:{}",
mz.environmentd_service_name(),
mz.namespace(),
config.environmentd_internal_http_port,
)),
..Default::default()
}];
if config.enable_tls {
unimplemented!();
} else {
}
let security_context = if config.enable_security_context {
Some(SecurityContext {
run_as_non_root: Some(true),
capabilities: Some(Capabilities {
drop: Some(vec!["ALL".to_string()]),
..Default::default()
}),
seccomp_profile: Some(SeccompProfile {
type_: "RuntimeDefault".to_string(),
..Default::default()
}),
allow_privilege_escalation: Some(false),
..Default::default()
})
} else {
None
};
let container = Container {
name: "console".to_owned(),
image: Some(console_image_ref.to_string()),
image_pull_policy: Some(config.image_pull_policy.to_string()),
ports: Some(ports),
env: Some(env),
startup_probe: Some(Probe {
period_seconds: Some(1),
failure_threshold: Some(10),
..probe.clone()
}),
readiness_probe: Some(Probe {
period_seconds: Some(30),
failure_threshold: Some(1),
..probe.clone()
}),
liveness_probe: Some(Probe {
period_seconds: Some(30),
..probe.clone()
}),
resources: mz.spec.console_resource_requirements.clone(),
security_context,
..Default::default()
};
let deployment_spec = DeploymentSpec {
replicas: Some(2),
selector: LabelSelector {
match_labels: Some(pod_template_labels.clone()),
..Default::default()
},
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(pod_template_labels),
..Default::default()
}),
spec: Some(PodSpec {
containers: vec![container],
node_selector: Some(
config
.console_node_selector
.iter()
.map(|selector| (selector.key.clone(), selector.value.clone()))
.collect(),
),
scheduler_name: config.scheduler_name.clone(),
service_account_name: Some(mz.service_account_name()),
..Default::default()
}),
},
..Default::default()
};
Deployment {
metadata: ObjectMeta {
..mz.managed_resource_meta(mz.console_deployment_name())
},
spec: Some(deployment_spec),
status: None,
}
}
fn create_console_service_object(config: &super::Args, mz: &Materialize) -> Service {
let selector = btreemap! {"materialize.cloud/name".to_string() => mz.console_deployment_name()};
let ports = vec![ServicePort {
name: Some("http".to_string()),
protocol: Some("TCP".to_string()),
port: config.console_http_port,
target_port: Some(IntOrString::Int(CONSOLE_IMAGE_HTTP_PORT)),
..Default::default()
}];
let spec = if config.local_development {
ServiceSpec {
type_: Some("NodePort".to_string()),
selector: Some(selector),
ports: Some(ports),
external_traffic_policy: Some("Local".to_string()),
..Default::default()
}
} else {
ServiceSpec {
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
selector: Some(selector),
ports: Some(ports),
..Default::default()
}
};
Service {
metadata: mz.managed_resource_meta(mz.console_service_name()),
spec: Some(spec),
status: None,
}
}