misc.python.materialize.zippy.kafka_actions

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10import random
 11import string
 12import threading
 13from textwrap import dedent
 14
 15import numpy as np
 16
 17from materialize.mzcompose.composition import Composition
 18from materialize.zippy.framework import (
 19    Action,
 20    ActionFactory,
 21    Capabilities,
 22    Capability,
 23    State,
 24)
 25from materialize.zippy.kafka_capabilities import Envelope, KafkaRunning, TopicExists
 26from materialize.zippy.mz_capabilities import MzIsRunning
 27
 28SCHEMA = """
 29$ set keyschema={
 30        "type" : "record",
 31        "name" : "test",
 32        "fields" : [
 33            {"name":"key", "type":"long"}
 34        ]
 35    }
 36
 37$ set schema={
 38        "type" : "record",
 39        "name" : "test",
 40        "fields" : [
 41            {"name":"f1", "type":"long"},
 42            {"name":"pad", "type":"string"}
 43        ]
 44    }
 45"""
 46
 47
 48class KafkaStart(Action):
 49    """Start a Kafka instance."""
 50
 51    def provides(self) -> list[Capability]:
 52        return [KafkaRunning()]
 53
 54    def run(self, c: Composition, state: State) -> None:
 55        c.up("redpanda")
 56
 57
 58class KafkaStop(Action):
 59    """Stop the Kafka instance."""
 60
 61    @classmethod
 62    def requires(cls) -> set[type[Capability]]:
 63        return {KafkaRunning}
 64
 65    def withholds(self) -> set[type[Capability]]:
 66        return {KafkaRunning}
 67
 68    def run(self, c: Composition, state: State) -> None:
 69        c.kill("redpanda")
 70
 71
 72class CreateTopicParameterized(ActionFactory):
 73    """Creates a Kafka topic and decides on the envelope that will be used."""
 74
 75    @classmethod
 76    def requires(cls) -> set[type[Capability]]:
 77        return {MzIsRunning, KafkaRunning}
 78
 79    def __init__(
 80        self,
 81        max_topics: int = 10,
 82        envelopes_with_weights: dict[Envelope, int] = {
 83            Envelope.NONE: 25,
 84            Envelope.UPSERT: 75,
 85        },
 86    ) -> None:
 87        self.max_topics = max_topics
 88        self.envelopes_with_weights = envelopes_with_weights
 89
 90    def new(self, capabilities: Capabilities) -> list[Action]:
 91        new_topic_name = capabilities.get_free_capability_name(
 92            TopicExists, self.max_topics
 93        )
 94
 95        if new_topic_name:
 96            return [
 97                CreateTopic(
 98                    capabilities=capabilities,
 99                    topic=TopicExists(
100                        name=new_topic_name,
101                        envelope=random.choices(
102                            list(self.envelopes_with_weights.keys()),
103                            weights=list(self.envelopes_with_weights.values()),
104                        )[0],
105                        partitions=random.randint(1, 10),
106                    ),
107                )
108            ]
109        else:
110            return []
111
112
113class CreateTopic(Action):
114    def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None:
115        self.topic = topic
116        super().__init__(capabilities)
117
118    def provides(self) -> list[Capability]:
119        return [self.topic]
120
121    def run(self, c: Composition, state: State) -> None:
122        c.testdrive(
123            SCHEMA
124            + dedent(
125                f"""
126                $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions}
127                $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1
128                {{"key": 0}} {{"f1": 0, "pad": ""}}
129                """
130            ),
131            mz_service=state.mz_service,
132        )
133
134
135class Ingest(Action):
136    """Ingests data (inserts, updates or deletions) into a Kafka topic."""
137
138    @classmethod
139    def requires(cls) -> set[type[Capability]]:
140        return {MzIsRunning, KafkaRunning, TopicExists}
141
142    def __init__(self, capabilities: Capabilities) -> None:
143        self.topic = random.choice(capabilities.get(TopicExists))
144        self.delta = random.randint(1, 10000)
145        # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes
146        self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice(
147            string.ascii_letters
148        )
149        super().__init__(capabilities)
150
151    def __str__(self) -> str:
152        return f"{Action.__str__(self)} {self.topic.name}"
153
154
155class KafkaInsert(Ingest):
156    """Inserts data into a Kafka topic."""
157
158    def parallel(self) -> bool:
159        return False
160
161    def run(self, c: Composition, state: State) -> None:
162        prev_max = self.topic.watermarks.max
163        self.topic.watermarks.max = prev_max + self.delta
164        assert self.topic.watermarks.max >= 0
165        assert self.topic.watermarks.min >= 0
166
167        testdrive_str = SCHEMA + dedent(
168            f"""
169            $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta}
170            {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}}
171            """
172        )
173
174        if self.parallel():
175            threading.Thread(target=c.testdrive, args=[testdrive_str]).start()
176        else:
177            c.testdrive(testdrive_str, mz_service=state.mz_service)
178
179
180class KafkaInsertParallel(KafkaInsert):
181    """Inserts data into a Kafka topic using background threads."""
182
183    @classmethod
184    def require_explicit_mention(cls) -> bool:
185        return True
186
187    def parallel(self) -> bool:
188        return True
189
190
191class KafkaUpsertFromHead(Ingest):
192    """Updates records from the head in-place by modifying their pad"""
193
194    def run(self, c: Composition, state: State) -> None:
195        if self.topic.envelope is Envelope.NONE:
196            return
197
198        head = self.topic.watermarks.max
199        start = max(head - self.delta, self.topic.watermarks.min)
200        actual_delta = head - start
201
202        if actual_delta > 0:
203            c.testdrive(
204                SCHEMA
205                + dedent(
206                    f"""
207                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta}
208                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
209                    """
210                ),
211                mz_service=state.mz_service,
212            )
213
214
215class KafkaDeleteFromHead(Ingest):
216    """Deletes the largest values previously inserted."""
217
218    def run(self, c: Composition, state: State) -> None:
219        if self.topic.envelope is Envelope.NONE:
220            return
221
222        prev_max = self.topic.watermarks.max
223        self.topic.watermarks.max = max(
224            prev_max - self.delta, self.topic.watermarks.min
225        )
226        assert self.topic.watermarks.max >= 0
227        assert self.topic.watermarks.min >= 0
228
229        actual_delta = prev_max - self.topic.watermarks.max
230
231        if actual_delta > 0:
232            c.testdrive(
233                SCHEMA
234                + dedent(
235                    f"""
236                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta}
237                    {{"key": ${{kafka-ingest.iteration}}}}
238                    """
239                ),
240                mz_service=state.mz_service,
241            )
242
243
244class KafkaUpsertFromTail(Ingest):
245    """Updates records from the tail in-place by modifying their pad"""
246
247    def run(self, c: Composition, state: State) -> None:
248        if self.topic.envelope is Envelope.NONE:
249            return
250
251        tail = self.topic.watermarks.min
252        end = min(tail + self.delta, self.topic.watermarks.max)
253        actual_delta = end - tail
254
255        if actual_delta > 0:
256            c.testdrive(
257                SCHEMA
258                + dedent(
259                    f"""
260                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta}
261                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
262                    """
263                ),
264                mz_service=state.mz_service,
265            )
266
267
268class KafkaDeleteFromTail(Ingest):
269    """Deletes the smallest values previously inserted."""
270
271    def run(self, c: Composition, state: State) -> None:
272        if self.topic.envelope is Envelope.NONE:
273            return
274
275        prev_min = self.topic.watermarks.min
276        self.topic.watermarks.min = min(
277            prev_min + self.delta, self.topic.watermarks.max
278        )
279        assert self.topic.watermarks.max >= 0
280        assert self.topic.watermarks.min >= 0
281        actual_delta = self.topic.watermarks.min - prev_min
282
283        if actual_delta > 0:
284            c.testdrive(
285                SCHEMA
286                + dedent(
287                    f"""
288                   $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta}
289                   {{"key": ${{kafka-ingest.iteration}}}}
290                   """
291                ),
292                mz_service=state.mz_service,
293            )
SCHEMA = '\n$ set keyschema={\n "type" : "record",\n "name" : "test",\n "fields" : [\n {"name":"key", "type":"long"}\n ]\n }\n\n$ set schema={\n "type" : "record",\n "name" : "test",\n "fields" : [\n {"name":"f1", "type":"long"},\n {"name":"pad", "type":"string"}\n ]\n }\n'
class KafkaStart(materialize.zippy.framework.Action):
49class KafkaStart(Action):
50    """Start a Kafka instance."""
51
52    def provides(self) -> list[Capability]:
53        return [KafkaRunning()]
54
55    def run(self, c: Composition, state: State) -> None:
56        c.up("redpanda")

Start a Kafka instance.

def provides(self) -> list[materialize.zippy.framework.Capability]:
52    def provides(self) -> list[Capability]:
53        return [KafkaRunning()]

Compute the capabilities that this action will make available.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
55    def run(self, c: Composition, state: State) -> None:
56        c.up("redpanda")

Run this action on the provided composition.

class KafkaStop(materialize.zippy.framework.Action):
59class KafkaStop(Action):
60    """Stop the Kafka instance."""
61
62    @classmethod
63    def requires(cls) -> set[type[Capability]]:
64        return {KafkaRunning}
65
66    def withholds(self) -> set[type[Capability]]:
67        return {KafkaRunning}
68
69    def run(self, c: Composition, state: State) -> None:
70        c.kill("redpanda")

Stop the Kafka instance.

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
62    @classmethod
63    def requires(cls) -> set[type[Capability]]:
64        return {KafkaRunning}

Compute the capability classes that this action requires.

def withholds(self) -> set[type[materialize.zippy.framework.Capability]]:
66    def withholds(self) -> set[type[Capability]]:
67        return {KafkaRunning}

Compute the capability classes that this action will make unavailable.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
69    def run(self, c: Composition, state: State) -> None:
70        c.kill("redpanda")

Run this action on the provided composition.

class CreateTopicParameterized(materialize.zippy.framework.ActionFactory):
 73class CreateTopicParameterized(ActionFactory):
 74    """Creates a Kafka topic and decides on the envelope that will be used."""
 75
 76    @classmethod
 77    def requires(cls) -> set[type[Capability]]:
 78        return {MzIsRunning, KafkaRunning}
 79
 80    def __init__(
 81        self,
 82        max_topics: int = 10,
 83        envelopes_with_weights: dict[Envelope, int] = {
 84            Envelope.NONE: 25,
 85            Envelope.UPSERT: 75,
 86        },
 87    ) -> None:
 88        self.max_topics = max_topics
 89        self.envelopes_with_weights = envelopes_with_weights
 90
 91    def new(self, capabilities: Capabilities) -> list[Action]:
 92        new_topic_name = capabilities.get_free_capability_name(
 93            TopicExists, self.max_topics
 94        )
 95
 96        if new_topic_name:
 97            return [
 98                CreateTopic(
 99                    capabilities=capabilities,
100                    topic=TopicExists(
101                        name=new_topic_name,
102                        envelope=random.choices(
103                            list(self.envelopes_with_weights.keys()),
104                            weights=list(self.envelopes_with_weights.values()),
105                        )[0],
106                        partitions=random.randint(1, 10),
107                    ),
108                )
109            ]
110        else:
111            return []

Creates a Kafka topic and decides on the envelope that will be used.

CreateTopicParameterized( max_topics: int = 10, envelopes_with_weights: dict[materialize.zippy.kafka_capabilities.Envelope, int] = {<Envelope.NONE: 1>: 25, <Envelope.UPSERT: 2>: 75})
80    def __init__(
81        self,
82        max_topics: int = 10,
83        envelopes_with_weights: dict[Envelope, int] = {
84            Envelope.NONE: 25,
85            Envelope.UPSERT: 75,
86        },
87    ) -> None:
88        self.max_topics = max_topics
89        self.envelopes_with_weights = envelopes_with_weights
@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
76    @classmethod
77    def requires(cls) -> set[type[Capability]]:
78        return {MzIsRunning, KafkaRunning}

Compute the capability classes that this Action Factory requires.

max_topics
envelopes_with_weights
def new( self, capabilities: materialize.zippy.framework.Capabilities) -> list[materialize.zippy.framework.Action]:
 91    def new(self, capabilities: Capabilities) -> list[Action]:
 92        new_topic_name = capabilities.get_free_capability_name(
 93            TopicExists, self.max_topics
 94        )
 95
 96        if new_topic_name:
 97            return [
 98                CreateTopic(
 99                    capabilities=capabilities,
100                    topic=TopicExists(
101                        name=new_topic_name,
102                        envelope=random.choices(
103                            list(self.envelopes_with_weights.keys()),
104                            weights=list(self.envelopes_with_weights.values()),
105                        )[0],
106                        partitions=random.randint(1, 10),
107                    ),
108                )
109            ]
110        else:
111            return []
class CreateTopic(materialize.zippy.framework.Action):
114class CreateTopic(Action):
115    def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None:
116        self.topic = topic
117        super().__init__(capabilities)
118
119    def provides(self) -> list[Capability]:
120        return [self.topic]
121
122    def run(self, c: Composition, state: State) -> None:
123        c.testdrive(
124            SCHEMA
125            + dedent(
126                f"""
127                $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions}
128                $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1
129                {{"key": 0}} {{"f1": 0, "pad": ""}}
130                """
131            ),
132            mz_service=state.mz_service,
133        )

Base class for an action that a Zippy test can take.

CreateTopic( capabilities: materialize.zippy.framework.Capabilities, topic: materialize.zippy.kafka_capabilities.TopicExists)
115    def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None:
116        self.topic = topic
117        super().__init__(capabilities)

Construct a new action, possibly conditioning on the available capabilities.

topic
def provides(self) -> list[materialize.zippy.framework.Capability]:
119    def provides(self) -> list[Capability]:
120        return [self.topic]

Compute the capabilities that this action will make available.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
122    def run(self, c: Composition, state: State) -> None:
123        c.testdrive(
124            SCHEMA
125            + dedent(
126                f"""
127                $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions}
128                $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1
129                {{"key": 0}} {{"f1": 0, "pad": ""}}
130                """
131            ),
132            mz_service=state.mz_service,
133        )

Run this action on the provided composition.

class Ingest(materialize.zippy.framework.Action):
136class Ingest(Action):
137    """Ingests data (inserts, updates or deletions) into a Kafka topic."""
138
139    @classmethod
140    def requires(cls) -> set[type[Capability]]:
141        return {MzIsRunning, KafkaRunning, TopicExists}
142
143    def __init__(self, capabilities: Capabilities) -> None:
144        self.topic = random.choice(capabilities.get(TopicExists))
145        self.delta = random.randint(1, 10000)
146        # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes
147        self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice(
148            string.ascii_letters
149        )
150        super().__init__(capabilities)
151
152    def __str__(self) -> str:
153        return f"{Action.__str__(self)} {self.topic.name}"

Ingests data (inserts, updates or deletions) into a Kafka topic.

Ingest(capabilities: materialize.zippy.framework.Capabilities)
143    def __init__(self, capabilities: Capabilities) -> None:
144        self.topic = random.choice(capabilities.get(TopicExists))
145        self.delta = random.randint(1, 10000)
146        # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes
147        self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice(
148            string.ascii_letters
149        )
150        super().__init__(capabilities)

Construct a new action, possibly conditioning on the available capabilities.

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
139    @classmethod
140    def requires(cls) -> set[type[Capability]]:
141        return {MzIsRunning, KafkaRunning, TopicExists}

Compute the capability classes that this action requires.

topic
delta
pad
class KafkaInsert(Ingest):
156class KafkaInsert(Ingest):
157    """Inserts data into a Kafka topic."""
158
159    def parallel(self) -> bool:
160        return False
161
162    def run(self, c: Composition, state: State) -> None:
163        prev_max = self.topic.watermarks.max
164        self.topic.watermarks.max = prev_max + self.delta
165        assert self.topic.watermarks.max >= 0
166        assert self.topic.watermarks.min >= 0
167
168        testdrive_str = SCHEMA + dedent(
169            f"""
170            $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta}
171            {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}}
172            """
173        )
174
175        if self.parallel():
176            threading.Thread(target=c.testdrive, args=[testdrive_str]).start()
177        else:
178            c.testdrive(testdrive_str, mz_service=state.mz_service)

Inserts data into a Kafka topic.

def parallel(self) -> bool:
159    def parallel(self) -> bool:
160        return False
def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
162    def run(self, c: Composition, state: State) -> None:
163        prev_max = self.topic.watermarks.max
164        self.topic.watermarks.max = prev_max + self.delta
165        assert self.topic.watermarks.max >= 0
166        assert self.topic.watermarks.min >= 0
167
168        testdrive_str = SCHEMA + dedent(
169            f"""
170            $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta}
171            {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}}
172            """
173        )
174
175        if self.parallel():
176            threading.Thread(target=c.testdrive, args=[testdrive_str]).start()
177        else:
178            c.testdrive(testdrive_str, mz_service=state.mz_service)

Run this action on the provided composition.

Inherited Members
Ingest
Ingest
requires
topic
delta
pad
class KafkaInsertParallel(KafkaInsert):
181class KafkaInsertParallel(KafkaInsert):
182    """Inserts data into a Kafka topic using background threads."""
183
184    @classmethod
185    def require_explicit_mention(cls) -> bool:
186        return True
187
188    def parallel(self) -> bool:
189        return True

Inserts data into a Kafka topic using background threads.

@classmethod
def require_explicit_mention(cls) -> bool:
184    @classmethod
185    def require_explicit_mention(cls) -> bool:
186        return True

Only use if explicitly mentioned by name in a Scenario.

def parallel(self) -> bool:
188    def parallel(self) -> bool:
189        return True
class KafkaUpsertFromHead(Ingest):
192class KafkaUpsertFromHead(Ingest):
193    """Updates records from the head in-place by modifying their pad"""
194
195    def run(self, c: Composition, state: State) -> None:
196        if self.topic.envelope is Envelope.NONE:
197            return
198
199        head = self.topic.watermarks.max
200        start = max(head - self.delta, self.topic.watermarks.min)
201        actual_delta = head - start
202
203        if actual_delta > 0:
204            c.testdrive(
205                SCHEMA
206                + dedent(
207                    f"""
208                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta}
209                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
210                    """
211                ),
212                mz_service=state.mz_service,
213            )

Updates records from the head in-place by modifying their pad

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
195    def run(self, c: Composition, state: State) -> None:
196        if self.topic.envelope is Envelope.NONE:
197            return
198
199        head = self.topic.watermarks.max
200        start = max(head - self.delta, self.topic.watermarks.min)
201        actual_delta = head - start
202
203        if actual_delta > 0:
204            c.testdrive(
205                SCHEMA
206                + dedent(
207                    f"""
208                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta}
209                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
210                    """
211                ),
212                mz_service=state.mz_service,
213            )

Run this action on the provided composition.

Inherited Members
Ingest
Ingest
requires
topic
delta
pad
class KafkaDeleteFromHead(Ingest):
216class KafkaDeleteFromHead(Ingest):
217    """Deletes the largest values previously inserted."""
218
219    def run(self, c: Composition, state: State) -> None:
220        if self.topic.envelope is Envelope.NONE:
221            return
222
223        prev_max = self.topic.watermarks.max
224        self.topic.watermarks.max = max(
225            prev_max - self.delta, self.topic.watermarks.min
226        )
227        assert self.topic.watermarks.max >= 0
228        assert self.topic.watermarks.min >= 0
229
230        actual_delta = prev_max - self.topic.watermarks.max
231
232        if actual_delta > 0:
233            c.testdrive(
234                SCHEMA
235                + dedent(
236                    f"""
237                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta}
238                    {{"key": ${{kafka-ingest.iteration}}}}
239                    """
240                ),
241                mz_service=state.mz_service,
242            )

Deletes the largest values previously inserted.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
219    def run(self, c: Composition, state: State) -> None:
220        if self.topic.envelope is Envelope.NONE:
221            return
222
223        prev_max = self.topic.watermarks.max
224        self.topic.watermarks.max = max(
225            prev_max - self.delta, self.topic.watermarks.min
226        )
227        assert self.topic.watermarks.max >= 0
228        assert self.topic.watermarks.min >= 0
229
230        actual_delta = prev_max - self.topic.watermarks.max
231
232        if actual_delta > 0:
233            c.testdrive(
234                SCHEMA
235                + dedent(
236                    f"""
237                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta}
238                    {{"key": ${{kafka-ingest.iteration}}}}
239                    """
240                ),
241                mz_service=state.mz_service,
242            )

Run this action on the provided composition.

Inherited Members
Ingest
Ingest
requires
topic
delta
pad
class KafkaUpsertFromTail(Ingest):
245class KafkaUpsertFromTail(Ingest):
246    """Updates records from the tail in-place by modifying their pad"""
247
248    def run(self, c: Composition, state: State) -> None:
249        if self.topic.envelope is Envelope.NONE:
250            return
251
252        tail = self.topic.watermarks.min
253        end = min(tail + self.delta, self.topic.watermarks.max)
254        actual_delta = end - tail
255
256        if actual_delta > 0:
257            c.testdrive(
258                SCHEMA
259                + dedent(
260                    f"""
261                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta}
262                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
263                    """
264                ),
265                mz_service=state.mz_service,
266            )

Updates records from the tail in-place by modifying their pad

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
248    def run(self, c: Composition, state: State) -> None:
249        if self.topic.envelope is Envelope.NONE:
250            return
251
252        tail = self.topic.watermarks.min
253        end = min(tail + self.delta, self.topic.watermarks.max)
254        actual_delta = end - tail
255
256        if actual_delta > 0:
257            c.testdrive(
258                SCHEMA
259                + dedent(
260                    f"""
261                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta}
262                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
263                    """
264                ),
265                mz_service=state.mz_service,
266            )

Run this action on the provided composition.

Inherited Members
Ingest
Ingest
requires
topic
delta
pad
class KafkaDeleteFromTail(Ingest):
269class KafkaDeleteFromTail(Ingest):
270    """Deletes the smallest values previously inserted."""
271
272    def run(self, c: Composition, state: State) -> None:
273        if self.topic.envelope is Envelope.NONE:
274            return
275
276        prev_min = self.topic.watermarks.min
277        self.topic.watermarks.min = min(
278            prev_min + self.delta, self.topic.watermarks.max
279        )
280        assert self.topic.watermarks.max >= 0
281        assert self.topic.watermarks.min >= 0
282        actual_delta = self.topic.watermarks.min - prev_min
283
284        if actual_delta > 0:
285            c.testdrive(
286                SCHEMA
287                + dedent(
288                    f"""
289                   $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta}
290                   {{"key": ${{kafka-ingest.iteration}}}}
291                   """
292                ),
293                mz_service=state.mz_service,
294            )

Deletes the smallest values previously inserted.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
272    def run(self, c: Composition, state: State) -> None:
273        if self.topic.envelope is Envelope.NONE:
274            return
275
276        prev_min = self.topic.watermarks.min
277        self.topic.watermarks.min = min(
278            prev_min + self.delta, self.topic.watermarks.max
279        )
280        assert self.topic.watermarks.max >= 0
281        assert self.topic.watermarks.min >= 0
282        actual_delta = self.topic.watermarks.min - prev_min
283
284        if actual_delta > 0:
285            c.testdrive(
286                SCHEMA
287                + dedent(
288                    f"""
289                   $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta}
290                   {{"key": ${{kafka-ingest.iteration}}}}
291                   """
292                ),
293                mz_service=state.mz_service,
294            )

Run this action on the provided composition.

Inherited Members
Ingest
Ingest
requires
topic
delta
pad