Module materialize.mzcompose.services.kafka

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.


from materialize.mzcompose import (
    DEFAULT_CONFLUENT_PLATFORM_VERSION,
)
from materialize.mzcompose.service import (
    Service,
    ServiceConfig,
)


class Kafka(Service):
    def __init__(
        self,
        name: str = "kafka",
        image: str = "confluentinc/cp-kafka",
        tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION,
        ports: list[str | int] | None = None,
        allow_host_ports: bool = False,
        auto_create_topics: bool = False,
        broker_id: int = 1,
        offsets_topic_replication_factor: int = 1,
        advertised_listeners: list[str] = [],
        environment: list[str] = [
            "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
            "KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false",
            "KAFKA_MIN_INSYNC_REPLICAS=1",
            "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1",
            "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1",
            "KAFKA_MESSAGE_MAX_BYTES=15728640",
            "KAFKA_REPLICA_FETCH_MAX_BYTES=15728640",
            "KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=100",
        ],
        environment_extra: list[str] = [],
        depends_on_extra: list[str] = [],
        volumes: list[str] = [],
        platform: str | None = None,
    ) -> None:
        if not advertised_listeners:
            advertised_listeners = [f"PLAINTEXT://{name}:9092"]
        environment = [
            *environment,
            f"KAFKA_ADVERTISED_LISTENERS={','.join(advertised_listeners)}",
            f"KAFKA_BROKER_ID={broker_id}",
            *environment_extra,
        ]
        if ports is None:
            ports = [l.split(":")[-1] for l in advertised_listeners]
        config: ServiceConfig = {
            "image": f"{image}:{tag}",
            "ports": ports,
            "allow_host_ports": allow_host_ports,
            "environment": [
                *environment,
                f"KAFKA_AUTO_CREATE_TOPICS_ENABLE={auto_create_topics}",
                f"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR={offsets_topic_replication_factor}",
            ],
            "depends_on": {
                "zookeeper": {"condition": "service_healthy"},
                **{s: {"condition": "service_started"} for s in depends_on_extra},
            },
            "healthcheck": {
                "test": ["CMD", "nc", "-z", "localhost", "9092"],
                "interval": "1s",
                "start_period": "120s",
            },
            "volumes": volumes,
        }
        if platform:
            config["platform"] = platform
        super().__init__(name=name, config=config)

Classes

class Kafka (name: str = 'kafka', image: str = 'confluentinc/cp-kafka', tag: str = '7.5.2', ports: list[str | int] | None = None, allow_host_ports: bool = False, auto_create_topics: bool = False, broker_id: int = 1, offsets_topic_replication_factor: int = 1, advertised_listeners: list[str] = [], environment: list[str] = ['KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181', 'KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false', 'KAFKA_MIN_INSYNC_REPLICAS=1', 'KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1', 'KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1', 'KAFKA_MESSAGE_MAX_BYTES=15728640', 'KAFKA_REPLICA_FETCH_MAX_BYTES=15728640', 'KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=100'], environment_extra: list[str] = [], depends_on_extra: list[str] = [], volumes: list[str] = [], platform: str | None = None)

A Docker Compose service in a Composition.

Attributes

name
The name of the service.
config
The definition of the service.
Expand source code Browse git
class Kafka(Service):
    def __init__(
        self,
        name: str = "kafka",
        image: str = "confluentinc/cp-kafka",
        tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION,
        ports: list[str | int] | None = None,
        allow_host_ports: bool = False,
        auto_create_topics: bool = False,
        broker_id: int = 1,
        offsets_topic_replication_factor: int = 1,
        advertised_listeners: list[str] = [],
        environment: list[str] = [
            "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
            "KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false",
            "KAFKA_MIN_INSYNC_REPLICAS=1",
            "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1",
            "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1",
            "KAFKA_MESSAGE_MAX_BYTES=15728640",
            "KAFKA_REPLICA_FETCH_MAX_BYTES=15728640",
            "KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=100",
        ],
        environment_extra: list[str] = [],
        depends_on_extra: list[str] = [],
        volumes: list[str] = [],
        platform: str | None = None,
    ) -> None:
        if not advertised_listeners:
            advertised_listeners = [f"PLAINTEXT://{name}:9092"]
        environment = [
            *environment,
            f"KAFKA_ADVERTISED_LISTENERS={','.join(advertised_listeners)}",
            f"KAFKA_BROKER_ID={broker_id}",
            *environment_extra,
        ]
        if ports is None:
            ports = [l.split(":")[-1] for l in advertised_listeners]
        config: ServiceConfig = {
            "image": f"{image}:{tag}",
            "ports": ports,
            "allow_host_ports": allow_host_ports,
            "environment": [
                *environment,
                f"KAFKA_AUTO_CREATE_TOPICS_ENABLE={auto_create_topics}",
                f"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR={offsets_topic_replication_factor}",
            ],
            "depends_on": {
                "zookeeper": {"condition": "service_healthy"},
                **{s: {"condition": "service_started"} for s in depends_on_extra},
            },
            "healthcheck": {
                "test": ["CMD", "nc", "-z", "localhost", "9092"],
                "interval": "1s",
                "start_period": "120s",
            },
            "volumes": volumes,
        }
        if platform:
            config["platform"] = platform
        super().__init__(name=name, config=config)

Ancestors