Module materialize.zippy.kafka_capabilities

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from enum import Enum

from materialize.zippy.framework import Capability
from materialize.zippy.watermarks import Watermarks


class KafkaRunning(Capability):
    """Kafka is running in the environment."""

    pass


class Envelope(Enum):
    """Kafka envelope to be used for a particular topic or source.

    If the Envelope is NONE, no deletions take place on the topic, just insertions
    """

    NONE = 1
    UPSERT = 2


class TopicExists(Capability):
    """A Topic exists on the Kafka instance."""

    @classmethod
    def format_str(cls) -> str:
        return "topic-{}"

    def __init__(self, name: str, partitions: int, envelope: Envelope) -> None:
        self.name = name
        self.partitions = partitions
        self.envelope = envelope
        self.watermarks = Watermarks()

    def get_watermarks(self) -> Watermarks:
        return self.watermarks

Classes

class Envelope (*args, **kwds)

Kafka envelope to be used for a particular topic or source.

If the Envelope is NONE, no deletions take place on the topic, just insertions

Expand source code Browse git
class Envelope(Enum):
    """Kafka envelope to be used for a particular topic or source.

    If the Envelope is NONE, no deletions take place on the topic, just insertions
    """

    NONE = 1
    UPSERT = 2

Ancestors

  • enum.Enum

Class variables

var NONE
var UPSERT
class KafkaRunning

Kafka is running in the environment.

Expand source code Browse git
class KafkaRunning(Capability):
    """Kafka is running in the environment."""

    pass

Ancestors

class TopicExists (name: str, partitions: int, envelope: Envelope)

A Topic exists on the Kafka instance.

Expand source code Browse git
class TopicExists(Capability):
    """A Topic exists on the Kafka instance."""

    @classmethod
    def format_str(cls) -> str:
        return "topic-{}"

    def __init__(self, name: str, partitions: int, envelope: Envelope) -> None:
        self.name = name
        self.partitions = partitions
        self.envelope = envelope
        self.watermarks = Watermarks()

    def get_watermarks(self) -> Watermarks:
        return self.watermarks

Ancestors

Static methods

def format_str() ‑> str
Expand source code Browse git
@classmethod
def format_str(cls) -> str:
    return "topic-{}"

Methods

def get_watermarks(self) ‑> Watermarks
Expand source code Browse git
def get_watermarks(self) -> Watermarks:
    return self.watermarks