misc.python.materialize.zippy.kafka_actions

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10import random
 11import string
 12import threading
 13from textwrap import dedent
 14
 15import numpy as np
 16
 17from materialize.mzcompose.composition import Composition
 18from materialize.zippy.framework import (
 19    Action,
 20    ActionFactory,
 21    Capabilities,
 22    Capability,
 23    State,
 24)
 25from materialize.zippy.kafka_capabilities import Envelope, KafkaRunning, TopicExists
 26from materialize.zippy.mz_capabilities import MzIsRunning
 27
 28SCHEMA = """
 29$ set keyschema={
 30        "type" : "record",
 31        "name" : "test",
 32        "fields" : [
 33            {"name":"key", "type":"long"}
 34        ]
 35    }
 36
 37$ set schema={
 38        "type" : "record",
 39        "name" : "test",
 40        "fields" : [
 41            {"name":"f1", "type":"long"},
 42            {"name":"pad", "type":"string"}
 43        ]
 44    }
 45"""
 46
 47
 48class KafkaStart(Action):
 49    """Start a Kafka instance."""
 50
 51    def provides(self) -> list[Capability]:
 52        return [KafkaRunning()]
 53
 54    def run(self, c: Composition, state: State) -> None:
 55        c.up("redpanda")
 56
 57
 58class KafkaStop(Action):
 59    """Stop the Kafka instance."""
 60
 61    @classmethod
 62    def requires(cls) -> set[type[Capability]]:
 63        return {KafkaRunning}
 64
 65    def withholds(self) -> set[type[Capability]]:
 66        return {KafkaRunning}
 67
 68    def run(self, c: Composition, state: State) -> None:
 69        c.kill("redpanda")
 70
 71
 72class CreateTopicParameterized(ActionFactory):
 73    """Creates a Kafka topic and decides on the envelope that will be used."""
 74
 75    @classmethod
 76    def requires(cls) -> set[type[Capability]]:
 77        return {MzIsRunning, KafkaRunning}
 78
 79    def __init__(
 80        self,
 81        max_topics: int = 10,
 82        envelopes_with_weights: dict[Envelope, int] = {
 83            Envelope.NONE: 25,
 84            Envelope.UPSERT: 75,
 85        },
 86    ) -> None:
 87        self.max_topics = max_topics
 88        self.envelopes_with_weights = envelopes_with_weights
 89
 90    def new(self, capabilities: Capabilities) -> list[Action]:
 91        new_topic_name = capabilities.get_free_capability_name(
 92            TopicExists, self.max_topics
 93        )
 94
 95        if new_topic_name:
 96            return [
 97                CreateTopic(
 98                    capabilities=capabilities,
 99                    topic=TopicExists(
100                        name=new_topic_name,
101                        envelope=random.choices(
102                            list(self.envelopes_with_weights.keys()),
103                            weights=list(self.envelopes_with_weights.values()),
104                        )[0],
105                        partitions=random.randint(1, 10),
106                    ),
107                )
108            ]
109        else:
110            return []
111
112
113class CreateTopic(Action):
114    def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None:
115        self.topic = topic
116        super().__init__(capabilities)
117
118    def provides(self) -> list[Capability]:
119        return [self.topic]
120
121    def run(self, c: Composition, state: State) -> None:
122        c.testdrive(
123            SCHEMA + dedent(f"""
124                $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions}
125                $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1
126                {{"key": 0}} {{"f1": 0, "pad": ""}}
127                """),
128            mz_service=state.mz_service,
129        )
130
131
132class Ingest(Action):
133    """Ingests data (inserts, updates or deletions) into a Kafka topic."""
134
135    @classmethod
136    def requires(cls) -> set[type[Capability]]:
137        return {MzIsRunning, KafkaRunning, TopicExists}
138
139    def __init__(self, capabilities: Capabilities) -> None:
140        self.topic = random.choice(capabilities.get(TopicExists))
141        self.delta = random.randint(1, 10000)
142        # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes
143        self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice(
144            string.ascii_letters
145        )
146        super().__init__(capabilities)
147
148    def __str__(self) -> str:
149        return f"{Action.__str__(self)} {self.topic.name}"
150
151
152class KafkaInsert(Ingest):
153    """Inserts data into a Kafka topic."""
154
155    def parallel(self) -> bool:
156        return False
157
158    def run(self, c: Composition, state: State) -> None:
159        prev_max = self.topic.watermarks.max
160        self.topic.watermarks.max = prev_max + self.delta
161        assert self.topic.watermarks.max >= 0
162        assert self.topic.watermarks.min >= 0
163
164        testdrive_str = SCHEMA + dedent(f"""
165            $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta}
166            {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}}
167            """)
168
169        if self.parallel():
170            threading.Thread(target=c.testdrive, args=[testdrive_str]).start()
171        else:
172            c.testdrive(testdrive_str, mz_service=state.mz_service)
173
174
175class KafkaInsertParallel(KafkaInsert):
176    """Inserts data into a Kafka topic using background threads."""
177
178    @classmethod
179    def require_explicit_mention(cls) -> bool:
180        return True
181
182    def parallel(self) -> bool:
183        return True
184
185
186class KafkaUpsertFromHead(Ingest):
187    """Updates records from the head in-place by modifying their pad"""
188
189    def run(self, c: Composition, state: State) -> None:
190        if self.topic.envelope is Envelope.NONE:
191            return
192
193        head = self.topic.watermarks.max
194        start = max(head - self.delta, self.topic.watermarks.min)
195        actual_delta = head - start
196
197        if actual_delta > 0:
198            c.testdrive(
199                SCHEMA + dedent(f"""
200                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta}
201                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
202                    """),
203                mz_service=state.mz_service,
204            )
205
206
207class KafkaDeleteFromHead(Ingest):
208    """Deletes the largest values previously inserted."""
209
210    def run(self, c: Composition, state: State) -> None:
211        if self.topic.envelope is Envelope.NONE:
212            return
213
214        prev_max = self.topic.watermarks.max
215        self.topic.watermarks.max = max(
216            prev_max - self.delta, self.topic.watermarks.min
217        )
218        assert self.topic.watermarks.max >= 0
219        assert self.topic.watermarks.min >= 0
220
221        actual_delta = prev_max - self.topic.watermarks.max
222
223        if actual_delta > 0:
224            c.testdrive(
225                SCHEMA + dedent(f"""
226                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta}
227                    {{"key": ${{kafka-ingest.iteration}}}}
228                    """),
229                mz_service=state.mz_service,
230            )
231
232
233class KafkaUpsertFromTail(Ingest):
234    """Updates records from the tail in-place by modifying their pad"""
235
236    def run(self, c: Composition, state: State) -> None:
237        if self.topic.envelope is Envelope.NONE:
238            return
239
240        tail = self.topic.watermarks.min
241        end = min(tail + self.delta, self.topic.watermarks.max)
242        actual_delta = end - tail
243
244        if actual_delta > 0:
245            c.testdrive(
246                SCHEMA + dedent(f"""
247                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta}
248                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
249                    """),
250                mz_service=state.mz_service,
251            )
252
253
254class KafkaDeleteFromTail(Ingest):
255    """Deletes the smallest values previously inserted."""
256
257    def run(self, c: Composition, state: State) -> None:
258        if self.topic.envelope is Envelope.NONE:
259            return
260
261        prev_min = self.topic.watermarks.min
262        self.topic.watermarks.min = min(
263            prev_min + self.delta, self.topic.watermarks.max
264        )
265        assert self.topic.watermarks.max >= 0
266        assert self.topic.watermarks.min >= 0
267        actual_delta = self.topic.watermarks.min - prev_min
268
269        if actual_delta > 0:
270            c.testdrive(
271                SCHEMA + dedent(f"""
272                   $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta}
273                   {{"key": ${{kafka-ingest.iteration}}}}
274                   """),
275                mz_service=state.mz_service,
276            )
SCHEMA = '\n$ set keyschema={\n "type" : "record",\n "name" : "test",\n "fields" : [\n {"name":"key", "type":"long"}\n ]\n }\n\n$ set schema={\n "type" : "record",\n "name" : "test",\n "fields" : [\n {"name":"f1", "type":"long"},\n {"name":"pad", "type":"string"}\n ]\n }\n'
class KafkaStart(materialize.zippy.framework.Action):
49class KafkaStart(Action):
50    """Start a Kafka instance."""
51
52    def provides(self) -> list[Capability]:
53        return [KafkaRunning()]
54
55    def run(self, c: Composition, state: State) -> None:
56        c.up("redpanda")

Start a Kafka instance.

def provides(self) -> list[materialize.zippy.framework.Capability]:
52    def provides(self) -> list[Capability]:
53        return [KafkaRunning()]

Compute the capabilities that this action will make available.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
55    def run(self, c: Composition, state: State) -> None:
56        c.up("redpanda")

Run this action on the provided composition.

class KafkaStop(materialize.zippy.framework.Action):
59class KafkaStop(Action):
60    """Stop the Kafka instance."""
61
62    @classmethod
63    def requires(cls) -> set[type[Capability]]:
64        return {KafkaRunning}
65
66    def withholds(self) -> set[type[Capability]]:
67        return {KafkaRunning}
68
69    def run(self, c: Composition, state: State) -> None:
70        c.kill("redpanda")

Stop the Kafka instance.

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
62    @classmethod
63    def requires(cls) -> set[type[Capability]]:
64        return {KafkaRunning}

Compute the capability classes that this action requires.

def withholds(self) -> set[type[materialize.zippy.framework.Capability]]:
66    def withholds(self) -> set[type[Capability]]:
67        return {KafkaRunning}

Compute the capability classes that this action will make unavailable.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
69    def run(self, c: Composition, state: State) -> None:
70        c.kill("redpanda")

Run this action on the provided composition.

class CreateTopicParameterized(materialize.zippy.framework.ActionFactory):
 73class CreateTopicParameterized(ActionFactory):
 74    """Creates a Kafka topic and decides on the envelope that will be used."""
 75
 76    @classmethod
 77    def requires(cls) -> set[type[Capability]]:
 78        return {MzIsRunning, KafkaRunning}
 79
 80    def __init__(
 81        self,
 82        max_topics: int = 10,
 83        envelopes_with_weights: dict[Envelope, int] = {
 84            Envelope.NONE: 25,
 85            Envelope.UPSERT: 75,
 86        },
 87    ) -> None:
 88        self.max_topics = max_topics
 89        self.envelopes_with_weights = envelopes_with_weights
 90
 91    def new(self, capabilities: Capabilities) -> list[Action]:
 92        new_topic_name = capabilities.get_free_capability_name(
 93            TopicExists, self.max_topics
 94        )
 95
 96        if new_topic_name:
 97            return [
 98                CreateTopic(
 99                    capabilities=capabilities,
100                    topic=TopicExists(
101                        name=new_topic_name,
102                        envelope=random.choices(
103                            list(self.envelopes_with_weights.keys()),
104                            weights=list(self.envelopes_with_weights.values()),
105                        )[0],
106                        partitions=random.randint(1, 10),
107                    ),
108                )
109            ]
110        else:
111            return []

Creates a Kafka topic and decides on the envelope that will be used.

CreateTopicParameterized( max_topics: int = 10, envelopes_with_weights: dict[materialize.zippy.kafka_capabilities.Envelope, int] = {<Envelope.NONE: 1>: 25, <Envelope.UPSERT: 2>: 75})
80    def __init__(
81        self,
82        max_topics: int = 10,
83        envelopes_with_weights: dict[Envelope, int] = {
84            Envelope.NONE: 25,
85            Envelope.UPSERT: 75,
86        },
87    ) -> None:
88        self.max_topics = max_topics
89        self.envelopes_with_weights = envelopes_with_weights
@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
76    @classmethod
77    def requires(cls) -> set[type[Capability]]:
78        return {MzIsRunning, KafkaRunning}

Compute the capability classes that this Action Factory requires.

max_topics
envelopes_with_weights
def new( self, capabilities: materialize.zippy.framework.Capabilities) -> list[materialize.zippy.framework.Action]:
 91    def new(self, capabilities: Capabilities) -> list[Action]:
 92        new_topic_name = capabilities.get_free_capability_name(
 93            TopicExists, self.max_topics
 94        )
 95
 96        if new_topic_name:
 97            return [
 98                CreateTopic(
 99                    capabilities=capabilities,
100                    topic=TopicExists(
101                        name=new_topic_name,
102                        envelope=random.choices(
103                            list(self.envelopes_with_weights.keys()),
104                            weights=list(self.envelopes_with_weights.values()),
105                        )[0],
106                        partitions=random.randint(1, 10),
107                    ),
108                )
109            ]
110        else:
111            return []
class CreateTopic(materialize.zippy.framework.Action):
114class CreateTopic(Action):
115    def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None:
116        self.topic = topic
117        super().__init__(capabilities)
118
119    def provides(self) -> list[Capability]:
120        return [self.topic]
121
122    def run(self, c: Composition, state: State) -> None:
123        c.testdrive(
124            SCHEMA + dedent(f"""
125                $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions}
126                $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1
127                {{"key": 0}} {{"f1": 0, "pad": ""}}
128                """),
129            mz_service=state.mz_service,
130        )

Base class for an action that a Zippy test can take.

CreateTopic( capabilities: materialize.zippy.framework.Capabilities, topic: materialize.zippy.kafka_capabilities.TopicExists)
115    def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None:
116        self.topic = topic
117        super().__init__(capabilities)

Construct a new action, possibly conditioning on the available capabilities.

topic
def provides(self) -> list[materialize.zippy.framework.Capability]:
119    def provides(self) -> list[Capability]:
120        return [self.topic]

Compute the capabilities that this action will make available.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
122    def run(self, c: Composition, state: State) -> None:
123        c.testdrive(
124            SCHEMA + dedent(f"""
125                $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions}
126                $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1
127                {{"key": 0}} {{"f1": 0, "pad": ""}}
128                """),
129            mz_service=state.mz_service,
130        )

Run this action on the provided composition.

class Ingest(materialize.zippy.framework.Action):
133class Ingest(Action):
134    """Ingests data (inserts, updates or deletions) into a Kafka topic."""
135
136    @classmethod
137    def requires(cls) -> set[type[Capability]]:
138        return {MzIsRunning, KafkaRunning, TopicExists}
139
140    def __init__(self, capabilities: Capabilities) -> None:
141        self.topic = random.choice(capabilities.get(TopicExists))
142        self.delta = random.randint(1, 10000)
143        # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes
144        self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice(
145            string.ascii_letters
146        )
147        super().__init__(capabilities)
148
149    def __str__(self) -> str:
150        return f"{Action.__str__(self)} {self.topic.name}"

Ingests data (inserts, updates or deletions) into a Kafka topic.

Ingest(capabilities: materialize.zippy.framework.Capabilities)
140    def __init__(self, capabilities: Capabilities) -> None:
141        self.topic = random.choice(capabilities.get(TopicExists))
142        self.delta = random.randint(1, 10000)
143        # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes
144        self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice(
145            string.ascii_letters
146        )
147        super().__init__(capabilities)

Construct a new action, possibly conditioning on the available capabilities.

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
136    @classmethod
137    def requires(cls) -> set[type[Capability]]:
138        return {MzIsRunning, KafkaRunning, TopicExists}

Compute the capability classes that this action requires.

topic
delta
pad
class KafkaInsert(Ingest):
153class KafkaInsert(Ingest):
154    """Inserts data into a Kafka topic."""
155
156    def parallel(self) -> bool:
157        return False
158
159    def run(self, c: Composition, state: State) -> None:
160        prev_max = self.topic.watermarks.max
161        self.topic.watermarks.max = prev_max + self.delta
162        assert self.topic.watermarks.max >= 0
163        assert self.topic.watermarks.min >= 0
164
165        testdrive_str = SCHEMA + dedent(f"""
166            $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta}
167            {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}}
168            """)
169
170        if self.parallel():
171            threading.Thread(target=c.testdrive, args=[testdrive_str]).start()
172        else:
173            c.testdrive(testdrive_str, mz_service=state.mz_service)

Inserts data into a Kafka topic.

def parallel(self) -> bool:
156    def parallel(self) -> bool:
157        return False
def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
159    def run(self, c: Composition, state: State) -> None:
160        prev_max = self.topic.watermarks.max
161        self.topic.watermarks.max = prev_max + self.delta
162        assert self.topic.watermarks.max >= 0
163        assert self.topic.watermarks.min >= 0
164
165        testdrive_str = SCHEMA + dedent(f"""
166            $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta}
167            {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}}
168            """)
169
170        if self.parallel():
171            threading.Thread(target=c.testdrive, args=[testdrive_str]).start()
172        else:
173            c.testdrive(testdrive_str, mz_service=state.mz_service)

Run this action on the provided composition.

Inherited Members
Ingest
Ingest
requires
topic
delta
pad
class KafkaInsertParallel(KafkaInsert):
176class KafkaInsertParallel(KafkaInsert):
177    """Inserts data into a Kafka topic using background threads."""
178
179    @classmethod
180    def require_explicit_mention(cls) -> bool:
181        return True
182
183    def parallel(self) -> bool:
184        return True

Inserts data into a Kafka topic using background threads.

@classmethod
def require_explicit_mention(cls) -> bool:
179    @classmethod
180    def require_explicit_mention(cls) -> bool:
181        return True

Only use if explicitly mentioned by name in a Scenario.

def parallel(self) -> bool:
183    def parallel(self) -> bool:
184        return True
class KafkaUpsertFromHead(Ingest):
187class KafkaUpsertFromHead(Ingest):
188    """Updates records from the head in-place by modifying their pad"""
189
190    def run(self, c: Composition, state: State) -> None:
191        if self.topic.envelope is Envelope.NONE:
192            return
193
194        head = self.topic.watermarks.max
195        start = max(head - self.delta, self.topic.watermarks.min)
196        actual_delta = head - start
197
198        if actual_delta > 0:
199            c.testdrive(
200                SCHEMA + dedent(f"""
201                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta}
202                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
203                    """),
204                mz_service=state.mz_service,
205            )

Updates records from the head in-place by modifying their pad

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
190    def run(self, c: Composition, state: State) -> None:
191        if self.topic.envelope is Envelope.NONE:
192            return
193
194        head = self.topic.watermarks.max
195        start = max(head - self.delta, self.topic.watermarks.min)
196        actual_delta = head - start
197
198        if actual_delta > 0:
199            c.testdrive(
200                SCHEMA + dedent(f"""
201                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta}
202                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
203                    """),
204                mz_service=state.mz_service,
205            )

Run this action on the provided composition.

Inherited Members
Ingest
Ingest
requires
topic
delta
pad
class KafkaDeleteFromHead(Ingest):
208class KafkaDeleteFromHead(Ingest):
209    """Deletes the largest values previously inserted."""
210
211    def run(self, c: Composition, state: State) -> None:
212        if self.topic.envelope is Envelope.NONE:
213            return
214
215        prev_max = self.topic.watermarks.max
216        self.topic.watermarks.max = max(
217            prev_max - self.delta, self.topic.watermarks.min
218        )
219        assert self.topic.watermarks.max >= 0
220        assert self.topic.watermarks.min >= 0
221
222        actual_delta = prev_max - self.topic.watermarks.max
223
224        if actual_delta > 0:
225            c.testdrive(
226                SCHEMA + dedent(f"""
227                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta}
228                    {{"key": ${{kafka-ingest.iteration}}}}
229                    """),
230                mz_service=state.mz_service,
231            )

Deletes the largest values previously inserted.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
211    def run(self, c: Composition, state: State) -> None:
212        if self.topic.envelope is Envelope.NONE:
213            return
214
215        prev_max = self.topic.watermarks.max
216        self.topic.watermarks.max = max(
217            prev_max - self.delta, self.topic.watermarks.min
218        )
219        assert self.topic.watermarks.max >= 0
220        assert self.topic.watermarks.min >= 0
221
222        actual_delta = prev_max - self.topic.watermarks.max
223
224        if actual_delta > 0:
225            c.testdrive(
226                SCHEMA + dedent(f"""
227                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta}
228                    {{"key": ${{kafka-ingest.iteration}}}}
229                    """),
230                mz_service=state.mz_service,
231            )

Run this action on the provided composition.

Inherited Members
Ingest
Ingest
requires
topic
delta
pad
class KafkaUpsertFromTail(Ingest):
234class KafkaUpsertFromTail(Ingest):
235    """Updates records from the tail in-place by modifying their pad"""
236
237    def run(self, c: Composition, state: State) -> None:
238        if self.topic.envelope is Envelope.NONE:
239            return
240
241        tail = self.topic.watermarks.min
242        end = min(tail + self.delta, self.topic.watermarks.max)
243        actual_delta = end - tail
244
245        if actual_delta > 0:
246            c.testdrive(
247                SCHEMA + dedent(f"""
248                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta}
249                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
250                    """),
251                mz_service=state.mz_service,
252            )

Updates records from the tail in-place by modifying their pad

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
237    def run(self, c: Composition, state: State) -> None:
238        if self.topic.envelope is Envelope.NONE:
239            return
240
241        tail = self.topic.watermarks.min
242        end = min(tail + self.delta, self.topic.watermarks.max)
243        actual_delta = end - tail
244
245        if actual_delta > 0:
246            c.testdrive(
247                SCHEMA + dedent(f"""
248                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta}
249                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
250                    """),
251                mz_service=state.mz_service,
252            )

Run this action on the provided composition.

Inherited Members
Ingest
Ingest
requires
topic
delta
pad
class KafkaDeleteFromTail(Ingest):
255class KafkaDeleteFromTail(Ingest):
256    """Deletes the smallest values previously inserted."""
257
258    def run(self, c: Composition, state: State) -> None:
259        if self.topic.envelope is Envelope.NONE:
260            return
261
262        prev_min = self.topic.watermarks.min
263        self.topic.watermarks.min = min(
264            prev_min + self.delta, self.topic.watermarks.max
265        )
266        assert self.topic.watermarks.max >= 0
267        assert self.topic.watermarks.min >= 0
268        actual_delta = self.topic.watermarks.min - prev_min
269
270        if actual_delta > 0:
271            c.testdrive(
272                SCHEMA + dedent(f"""
273                   $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta}
274                   {{"key": ${{kafka-ingest.iteration}}}}
275                   """),
276                mz_service=state.mz_service,
277            )

Deletes the smallest values previously inserted.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
258    def run(self, c: Composition, state: State) -> None:
259        if self.topic.envelope is Envelope.NONE:
260            return
261
262        prev_min = self.topic.watermarks.min
263        self.topic.watermarks.min = min(
264            prev_min + self.delta, self.topic.watermarks.max
265        )
266        assert self.topic.watermarks.max >= 0
267        assert self.topic.watermarks.min >= 0
268        actual_delta = self.topic.watermarks.min - prev_min
269
270        if actual_delta > 0:
271            c.testdrive(
272                SCHEMA + dedent(f"""
273                   $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta}
274                   {{"key": ${{kafka-ingest.iteration}}}}
275                   """),
276                mz_service=state.mz_service,
277            )

Run this action on the provided composition.

Inherited Members
Ingest
Ingest
requires
topic
delta
pad