Module materialize.mzcompose.services.schema_registry
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from materialize.mzcompose import DEFAULT_CONFLUENT_PLATFORM_VERSION
from materialize.mzcompose.service import Service, ServiceConfig
class SchemaRegistry(Service):
def __init__(
self,
name: str = "schema-registry",
aliases: list[str] = [],
image: str = "confluentinc/cp-schema-registry",
tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION,
port: int = 8081,
kafka_servers: list[tuple[str, str]] = [("kafka", "9092")],
environment_extra: list[str] = [],
depends_on_extra: list[str] = [],
volumes: list[str] = [],
platform: str | None = None,
) -> None:
bootstrap_servers = ",".join(
f"PLAINTEXT://{host}:{port}" for host, port in kafka_servers
)
environment = [
# Under Docker, Kafka can be really slow, which means the default
# Kafka connection timeout of 500ms is much too slow.
"SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS=10000",
f"SCHEMA_REGISTRY_HOST_NAME={name}",
f"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS={bootstrap_servers}",
*environment_extra,
]
config: ServiceConfig = {
"image": f"{image}:{tag}",
"ports": [port],
"networks": {"default": {"aliases": aliases}},
"environment": environment,
"depends_on": {
**{host: {"condition": "service_healthy"} for host, _ in kafka_servers},
**{s: {"condition": "service_started"} for s in depends_on_extra},
},
"healthcheck": {
"test": [
"CMD",
"curl",
# We provide credentials in case the schema registry is
# configured to require HTTP authentication, as there's
# no health check endpoint that's excluded from
# authentication requirements. The credentials are
# safely ignored if the schema registry is not
# configured to require them.
"-fu",
"materialize:sekurity",
"localhost:8081",
],
"interval": "1s",
"start_period": "120s",
},
"volumes": volumes,
}
if platform:
config["platform"] = platform
super().__init__(
name=name,
config=config,
)
Classes
class SchemaRegistry (name: str = 'schema-registry', aliases: list[str] = [], image: str = 'confluentinc/cp-schema-registry', tag: str = '7.5.2', port: int = 8081, kafka_servers: list[tuple[str, str]] = [('kafka', '9092')], environment_extra: list[str] = [], depends_on_extra: list[str] = [], volumes: list[str] = [], platform: str | None = None)
-
A Docker Compose service in a
Composition
.Attributes
name
- The name of the service.
config
- The definition of the service.
Expand source code Browse git
class SchemaRegistry(Service): def __init__( self, name: str = "schema-registry", aliases: list[str] = [], image: str = "confluentinc/cp-schema-registry", tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION, port: int = 8081, kafka_servers: list[tuple[str, str]] = [("kafka", "9092")], environment_extra: list[str] = [], depends_on_extra: list[str] = [], volumes: list[str] = [], platform: str | None = None, ) -> None: bootstrap_servers = ",".join( f"PLAINTEXT://{host}:{port}" for host, port in kafka_servers ) environment = [ # Under Docker, Kafka can be really slow, which means the default # Kafka connection timeout of 500ms is much too slow. "SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS=10000", f"SCHEMA_REGISTRY_HOST_NAME={name}", f"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS={bootstrap_servers}", *environment_extra, ] config: ServiceConfig = { "image": f"{image}:{tag}", "ports": [port], "networks": {"default": {"aliases": aliases}}, "environment": environment, "depends_on": { **{host: {"condition": "service_healthy"} for host, _ in kafka_servers}, **{s: {"condition": "service_started"} for s in depends_on_extra}, }, "healthcheck": { "test": [ "CMD", "curl", # We provide credentials in case the schema registry is # configured to require HTTP authentication, as there's # no health check endpoint that's excluded from # authentication requirements. The credentials are # safely ignored if the schema registry is not # configured to require them. "-fu", "materialize:sekurity", "localhost:8081", ], "interval": "1s", "start_period": "120s", }, "volumes": volumes, } if platform: config["platform"] = platform super().__init__( name=name, config=config, )
Ancestors