Module materialize.mzcompose.services.schema_registry

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.


from materialize.mzcompose import DEFAULT_CONFLUENT_PLATFORM_VERSION
from materialize.mzcompose.service import Service, ServiceConfig


class SchemaRegistry(Service):
    def __init__(
        self,
        name: str = "schema-registry",
        aliases: list[str] = [],
        image: str = "confluentinc/cp-schema-registry",
        tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION,
        port: int = 8081,
        kafka_servers: list[tuple[str, str]] = [("kafka", "9092")],
        environment_extra: list[str] = [],
        depends_on_extra: list[str] = [],
        volumes: list[str] = [],
        platform: str | None = None,
    ) -> None:
        bootstrap_servers = ",".join(
            f"PLAINTEXT://{host}:{port}" for host, port in kafka_servers
        )
        environment = [
            # Under Docker, Kafka can be really slow, which means the default
            # Kafka connection timeout of 500ms is much too slow.
            "SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS=10000",
            f"SCHEMA_REGISTRY_HOST_NAME={name}",
            f"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS={bootstrap_servers}",
            *environment_extra,
        ]
        config: ServiceConfig = {
            "image": f"{image}:{tag}",
            "ports": [port],
            "networks": {"default": {"aliases": aliases}},
            "environment": environment,
            "depends_on": {
                **{host: {"condition": "service_healthy"} for host, _ in kafka_servers},
                **{s: {"condition": "service_started"} for s in depends_on_extra},
            },
            "healthcheck": {
                "test": [
                    "CMD",
                    "curl",
                    # We provide credentials in case the schema registry is
                    # configured to require HTTP authentication, as there's
                    # no health check endpoint that's excluded from
                    # authentication requirements. The credentials are
                    # safely ignored if the schema registry is not
                    # configured to require them.
                    "-fu",
                    "materialize:sekurity",
                    "localhost:8081",
                ],
                "interval": "1s",
                "start_period": "120s",
            },
            "volumes": volumes,
        }
        if platform:
            config["platform"] = platform
        super().__init__(
            name=name,
            config=config,
        )

Classes

class SchemaRegistry (name: str = 'schema-registry', aliases: list[str] = [], image: str = 'confluentinc/cp-schema-registry', tag: str = '7.5.2', port: int = 8081, kafka_servers: list[tuple[str, str]] = [('kafka', '9092')], environment_extra: list[str] = [], depends_on_extra: list[str] = [], volumes: list[str] = [], platform: str | None = None)

A Docker Compose service in a Composition.

Attributes

name
The name of the service.
config
The definition of the service.
Expand source code Browse git
class SchemaRegistry(Service):
    def __init__(
        self,
        name: str = "schema-registry",
        aliases: list[str] = [],
        image: str = "confluentinc/cp-schema-registry",
        tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION,
        port: int = 8081,
        kafka_servers: list[tuple[str, str]] = [("kafka", "9092")],
        environment_extra: list[str] = [],
        depends_on_extra: list[str] = [],
        volumes: list[str] = [],
        platform: str | None = None,
    ) -> None:
        bootstrap_servers = ",".join(
            f"PLAINTEXT://{host}:{port}" for host, port in kafka_servers
        )
        environment = [
            # Under Docker, Kafka can be really slow, which means the default
            # Kafka connection timeout of 500ms is much too slow.
            "SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS=10000",
            f"SCHEMA_REGISTRY_HOST_NAME={name}",
            f"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS={bootstrap_servers}",
            *environment_extra,
        ]
        config: ServiceConfig = {
            "image": f"{image}:{tag}",
            "ports": [port],
            "networks": {"default": {"aliases": aliases}},
            "environment": environment,
            "depends_on": {
                **{host: {"condition": "service_healthy"} for host, _ in kafka_servers},
                **{s: {"condition": "service_started"} for s in depends_on_extra},
            },
            "healthcheck": {
                "test": [
                    "CMD",
                    "curl",
                    # We provide credentials in case the schema registry is
                    # configured to require HTTP authentication, as there's
                    # no health check endpoint that's excluded from
                    # authentication requirements. The credentials are
                    # safely ignored if the schema registry is not
                    # configured to require them.
                    "-fu",
                    "materialize:sekurity",
                    "localhost:8081",
                ],
                "interval": "1s",
                "start_period": "120s",
            },
            "volumes": volumes,
        }
        if platform:
            config["platform"] = platform
        super().__init__(
            name=name,
            config=config,
        )

Ancestors