Module materialize.cloudtest.k8s.debezium
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from kubernetes.client import (
V1Container,
V1ContainerPort,
V1Deployment,
V1DeploymentSpec,
V1EnvVar,
V1LabelSelector,
V1ObjectMeta,
V1PodSpec,
V1PodTemplateSpec,
V1Service,
V1ServicePort,
V1ServiceSpec,
)
from materialize.cloudtest.k8s import K8sDeployment, K8sService
class DebeziumDeployment(K8sDeployment):
def __init__(self) -> None:
ports = [V1ContainerPort(container_port=8083, name="debezium")]
env = [
V1EnvVar(name="BOOTSTRAP_SERVERS", value="redpanda:9092"),
V1EnvVar(name="CONFIG_STORAGE_TOPIC", value="connect_configs"),
V1EnvVar(name="OFFSET_STORAGE_TOPIC", value="connect_offsets"),
V1EnvVar(name="STATUS_STORAGE_TOPIC", value="connect_statuses"),
# We don't support JSON, so ensure that connect uses AVRO to encode messages and CSR to
# record the schema
V1EnvVar(
name="KEY_CONVERTER", value="io.confluent.connect.avro.AvroConverter"
),
V1EnvVar(
name="VALUE_CONVERTER", value="io.confluent.connect.avro.AvroConverter"
),
V1EnvVar(
name="CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL",
value="http://redpanda:8081",
),
V1EnvVar(
name="CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL",
value="http://redpanda:8081",
),
V1EnvVar(
name="CONNECT_OFFSET_COMMIT_POLICY", value="AlwaysCommitOffsetPolicy"
),
V1EnvVar(name="CONNECT_ERRORS_RETRY_TIMEOUT", value="60000"),
V1EnvVar(name="CONNECT_ERRORS_RETRY_DELAY_MAX_MS", value="1000"),
]
container = V1Container(
name="debezium", image="debezium/connect:1.9.5.Final", env=env, ports=ports
)
template = V1PodTemplateSpec(
metadata=V1ObjectMeta(labels={"app": "debezium"}),
spec=V1PodSpec(containers=[container]),
)
selector = V1LabelSelector(match_labels={"app": "debezium"})
spec = V1DeploymentSpec(replicas=1, template=template, selector=selector)
self.deployment = V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=V1ObjectMeta(name="debezium"),
spec=spec,
)
class DebeziumService(K8sService):
def __init__(self) -> None:
ports = [
V1ServicePort(name="debezium", port=8083),
]
self.service = V1Service(
metadata=V1ObjectMeta(name="debezium", labels={"app": "debezium"}),
spec=V1ServiceSpec(
type="NodePort", ports=ports, selector={"app": "debezium"}
),
)
DEBEZIUM_RESOURCES = [DebeziumDeployment(), DebeziumService()]
Classes
class DebeziumDeployment
-
Expand source code Browse git
class DebeziumDeployment(K8sDeployment): def __init__(self) -> None: ports = [V1ContainerPort(container_port=8083, name="debezium")] env = [ V1EnvVar(name="BOOTSTRAP_SERVERS", value="redpanda:9092"), V1EnvVar(name="CONFIG_STORAGE_TOPIC", value="connect_configs"), V1EnvVar(name="OFFSET_STORAGE_TOPIC", value="connect_offsets"), V1EnvVar(name="STATUS_STORAGE_TOPIC", value="connect_statuses"), # We don't support JSON, so ensure that connect uses AVRO to encode messages and CSR to # record the schema V1EnvVar( name="KEY_CONVERTER", value="io.confluent.connect.avro.AvroConverter" ), V1EnvVar( name="VALUE_CONVERTER", value="io.confluent.connect.avro.AvroConverter" ), V1EnvVar( name="CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL", value="http://redpanda:8081", ), V1EnvVar( name="CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL", value="http://redpanda:8081", ), V1EnvVar( name="CONNECT_OFFSET_COMMIT_POLICY", value="AlwaysCommitOffsetPolicy" ), V1EnvVar(name="CONNECT_ERRORS_RETRY_TIMEOUT", value="60000"), V1EnvVar(name="CONNECT_ERRORS_RETRY_DELAY_MAX_MS", value="1000"), ] container = V1Container( name="debezium", image="debezium/connect:1.9.5.Final", env=env, ports=ports ) template = V1PodTemplateSpec( metadata=V1ObjectMeta(labels={"app": "debezium"}), spec=V1PodSpec(containers=[container]), ) selector = V1LabelSelector(match_labels={"app": "debezium"}) spec = V1DeploymentSpec(replicas=1, template=template, selector=selector) self.deployment = V1Deployment( api_version="apps/v1", kind="Deployment", metadata=V1ObjectMeta(name="debezium"), spec=spec, )
Ancestors
class DebeziumService
-
Expand source code Browse git
class DebeziumService(K8sService): def __init__(self) -> None: ports = [ V1ServicePort(name="debezium", port=8083), ] self.service = V1Service( metadata=V1ObjectMeta(name="debezium", labels={"app": "debezium"}), spec=V1ServiceSpec( type="NodePort", ports=ports, selector={"app": "debezium"} ), )
Ancestors
Inherited members