Module materialize.mzcompose.services.debezium

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.


from materialize.mzcompose.service import (
    Service,
    ServiceDependency,
)


class Debezium(Service):
    def __init__(
        self,
        name: str = "debezium",
        port: int = 8083,
        redpanda: bool = False,
        environment: list[str] = [
            "CONNECT_BOOTSTRAP_SERVERS=kafka:9092",
            "CONNECT_GROUP_ID=connect",
            "CONNECT_CONFIG_STORAGE_TOPIC=connect_configs",
            "CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1",
            "CONNECT_OFFSET_STORAGE_TOPIC=connect_offsets",
            "CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1",
            "CONNECT_STATUS_STORAGE_TOPIC=connect_statuses",
            "CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1",
            # We don't support JSON, so ensure that connect uses Avro to encode
            # messages and CSR to record the schema.
            "CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter",
            "CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter",
            "CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081",
            "CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081",
            "CONNECT_OFFSET_COMMIT_POLICY=AlwaysCommitOffsetPolicy",
            "CONNECT_ERRORS_RETRY_TIMEOUT=60000",
            "CONNECT_ERRORS_RETRY_DELAY_MAX_MS=1000",
        ],
    ) -> None:
        depends_on: dict[str, ServiceDependency] = {
            "kafka": {"condition": "service_healthy"},
            "schema-registry": {"condition": "service_healthy"},
        }
        environment.append(f"CONNECT_REST_ADVERTISED_HOST_NAME={name}")
        if redpanda:
            depends_on = {"redpanda": {"condition": "service_healthy"}}
        super().__init__(
            name=name,
            config={
                "mzbuild": "debezium",
                "init": True,
                "ports": [port],
                "environment": environment,
                "depends_on": depends_on,
                "healthcheck": {
                    "test": ["CMD", "curl", "-f", "localhost:8083"],
                    "interval": "1s",
                    "start_period": "120s",
                },
            },
        )

Classes

class Debezium (name: str = 'debezium', port: int = 8083, redpanda: bool = False, environment: list[str] = ['CONNECT_BOOTSTRAP_SERVERS=kafka:9092', 'CONNECT_GROUP_ID=connect', 'CONNECT_CONFIG_STORAGE_TOPIC=connect_configs', 'CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1', 'CONNECT_OFFSET_STORAGE_TOPIC=connect_offsets', 'CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1', 'CONNECT_STATUS_STORAGE_TOPIC=connect_statuses', 'CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1', 'CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter', 'CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter', 'CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081', 'CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081', 'CONNECT_OFFSET_COMMIT_POLICY=AlwaysCommitOffsetPolicy', 'CONNECT_ERRORS_RETRY_TIMEOUT=60000', 'CONNECT_ERRORS_RETRY_DELAY_MAX_MS=1000'])

A Docker Compose service in a Composition.

Attributes

name
The name of the service.
config
The definition of the service.
Expand source code Browse git
class Debezium(Service):
    def __init__(
        self,
        name: str = "debezium",
        port: int = 8083,
        redpanda: bool = False,
        environment: list[str] = [
            "CONNECT_BOOTSTRAP_SERVERS=kafka:9092",
            "CONNECT_GROUP_ID=connect",
            "CONNECT_CONFIG_STORAGE_TOPIC=connect_configs",
            "CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1",
            "CONNECT_OFFSET_STORAGE_TOPIC=connect_offsets",
            "CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1",
            "CONNECT_STATUS_STORAGE_TOPIC=connect_statuses",
            "CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1",
            # We don't support JSON, so ensure that connect uses Avro to encode
            # messages and CSR to record the schema.
            "CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter",
            "CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter",
            "CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081",
            "CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081",
            "CONNECT_OFFSET_COMMIT_POLICY=AlwaysCommitOffsetPolicy",
            "CONNECT_ERRORS_RETRY_TIMEOUT=60000",
            "CONNECT_ERRORS_RETRY_DELAY_MAX_MS=1000",
        ],
    ) -> None:
        depends_on: dict[str, ServiceDependency] = {
            "kafka": {"condition": "service_healthy"},
            "schema-registry": {"condition": "service_healthy"},
        }
        environment.append(f"CONNECT_REST_ADVERTISED_HOST_NAME={name}")
        if redpanda:
            depends_on = {"redpanda": {"condition": "service_healthy"}}
        super().__init__(
            name=name,
            config={
                "mzbuild": "debezium",
                "init": True,
                "ports": [port],
                "environment": environment,
                "depends_on": depends_on,
                "healthcheck": {
                    "test": ["CMD", "curl", "-f", "localhost:8083"],
                    "interval": "1s",
                    "start_period": "120s",
                },
            },
        )

Ancestors