misc.python.materialize.zippy.kafka_actions
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10import random 11import string 12import threading 13from textwrap import dedent 14 15import numpy as np 16 17from materialize.mzcompose.composition import Composition 18from materialize.zippy.framework import ( 19 Action, 20 ActionFactory, 21 Capabilities, 22 Capability, 23 State, 24) 25from materialize.zippy.kafka_capabilities import Envelope, KafkaRunning, TopicExists 26from materialize.zippy.mz_capabilities import MzIsRunning 27 28SCHEMA = """ 29$ set keyschema={ 30 "type" : "record", 31 "name" : "test", 32 "fields" : [ 33 {"name":"key", "type":"long"} 34 ] 35 } 36 37$ set schema={ 38 "type" : "record", 39 "name" : "test", 40 "fields" : [ 41 {"name":"f1", "type":"long"}, 42 {"name":"pad", "type":"string"} 43 ] 44 } 45""" 46 47 48class KafkaStart(Action): 49 """Start a Kafka instance.""" 50 51 def provides(self) -> list[Capability]: 52 return [KafkaRunning()] 53 54 def run(self, c: Composition, state: State) -> None: 55 c.up("redpanda") 56 57 58class KafkaStop(Action): 59 """Stop the Kafka instance.""" 60 61 @classmethod 62 def requires(cls) -> set[type[Capability]]: 63 return {KafkaRunning} 64 65 def withholds(self) -> set[type[Capability]]: 66 return {KafkaRunning} 67 68 def run(self, c: Composition, state: State) -> None: 69 c.kill("redpanda") 70 71 72class CreateTopicParameterized(ActionFactory): 73 """Creates a Kafka topic and decides on the envelope that will be used.""" 74 75 @classmethod 76 def requires(cls) -> set[type[Capability]]: 77 return {MzIsRunning, KafkaRunning} 78 79 def __init__( 80 self, 81 max_topics: int = 10, 82 envelopes_with_weights: dict[Envelope, int] = { 83 Envelope.NONE: 25, 84 Envelope.UPSERT: 75, 85 }, 86 ) -> None: 87 self.max_topics = max_topics 88 self.envelopes_with_weights = envelopes_with_weights 89 90 def new(self, capabilities: Capabilities) -> list[Action]: 91 new_topic_name = capabilities.get_free_capability_name( 92 TopicExists, self.max_topics 93 ) 94 95 if new_topic_name: 96 return [ 97 CreateTopic( 98 capabilities=capabilities, 99 topic=TopicExists( 100 name=new_topic_name, 101 envelope=random.choices( 102 list(self.envelopes_with_weights.keys()), 103 weights=list(self.envelopes_with_weights.values()), 104 )[0], 105 partitions=random.randint(1, 10), 106 ), 107 ) 108 ] 109 else: 110 return [] 111 112 113class CreateTopic(Action): 114 def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None: 115 self.topic = topic 116 super().__init__(capabilities) 117 118 def provides(self) -> list[Capability]: 119 return [self.topic] 120 121 def run(self, c: Composition, state: State) -> None: 122 c.testdrive( 123 SCHEMA + dedent(f""" 124 $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions} 125 $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1 126 {{"key": 0}} {{"f1": 0, "pad": ""}} 127 """), 128 mz_service=state.mz_service, 129 ) 130 131 132class Ingest(Action): 133 """Ingests data (inserts, updates or deletions) into a Kafka topic.""" 134 135 @classmethod 136 def requires(cls) -> set[type[Capability]]: 137 return {MzIsRunning, KafkaRunning, TopicExists} 138 139 def __init__(self, capabilities: Capabilities) -> None: 140 self.topic = random.choice(capabilities.get(TopicExists)) 141 self.delta = random.randint(1, 10000) 142 # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes 143 self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice( 144 string.ascii_letters 145 ) 146 super().__init__(capabilities) 147 148 def __str__(self) -> str: 149 return f"{Action.__str__(self)} {self.topic.name}" 150 151 152class KafkaInsert(Ingest): 153 """Inserts data into a Kafka topic.""" 154 155 def parallel(self) -> bool: 156 return False 157 158 def run(self, c: Composition, state: State) -> None: 159 prev_max = self.topic.watermarks.max 160 self.topic.watermarks.max = prev_max + self.delta 161 assert self.topic.watermarks.max >= 0 162 assert self.topic.watermarks.min >= 0 163 164 testdrive_str = SCHEMA + dedent(f""" 165 $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta} 166 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}} 167 """) 168 169 if self.parallel(): 170 threading.Thread(target=c.testdrive, args=[testdrive_str]).start() 171 else: 172 c.testdrive(testdrive_str, mz_service=state.mz_service) 173 174 175class KafkaInsertParallel(KafkaInsert): 176 """Inserts data into a Kafka topic using background threads.""" 177 178 @classmethod 179 def require_explicit_mention(cls) -> bool: 180 return True 181 182 def parallel(self) -> bool: 183 return True 184 185 186class KafkaUpsertFromHead(Ingest): 187 """Updates records from the head in-place by modifying their pad""" 188 189 def run(self, c: Composition, state: State) -> None: 190 if self.topic.envelope is Envelope.NONE: 191 return 192 193 head = self.topic.watermarks.max 194 start = max(head - self.delta, self.topic.watermarks.min) 195 actual_delta = head - start 196 197 if actual_delta > 0: 198 c.testdrive( 199 SCHEMA + dedent(f""" 200 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta} 201 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} 202 """), 203 mz_service=state.mz_service, 204 ) 205 206 207class KafkaDeleteFromHead(Ingest): 208 """Deletes the largest values previously inserted.""" 209 210 def run(self, c: Composition, state: State) -> None: 211 if self.topic.envelope is Envelope.NONE: 212 return 213 214 prev_max = self.topic.watermarks.max 215 self.topic.watermarks.max = max( 216 prev_max - self.delta, self.topic.watermarks.min 217 ) 218 assert self.topic.watermarks.max >= 0 219 assert self.topic.watermarks.min >= 0 220 221 actual_delta = prev_max - self.topic.watermarks.max 222 223 if actual_delta > 0: 224 c.testdrive( 225 SCHEMA + dedent(f""" 226 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta} 227 {{"key": ${{kafka-ingest.iteration}}}} 228 """), 229 mz_service=state.mz_service, 230 ) 231 232 233class KafkaUpsertFromTail(Ingest): 234 """Updates records from the tail in-place by modifying their pad""" 235 236 def run(self, c: Composition, state: State) -> None: 237 if self.topic.envelope is Envelope.NONE: 238 return 239 240 tail = self.topic.watermarks.min 241 end = min(tail + self.delta, self.topic.watermarks.max) 242 actual_delta = end - tail 243 244 if actual_delta > 0: 245 c.testdrive( 246 SCHEMA + dedent(f""" 247 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta} 248 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} 249 """), 250 mz_service=state.mz_service, 251 ) 252 253 254class KafkaDeleteFromTail(Ingest): 255 """Deletes the smallest values previously inserted.""" 256 257 def run(self, c: Composition, state: State) -> None: 258 if self.topic.envelope is Envelope.NONE: 259 return 260 261 prev_min = self.topic.watermarks.min 262 self.topic.watermarks.min = min( 263 prev_min + self.delta, self.topic.watermarks.max 264 ) 265 assert self.topic.watermarks.max >= 0 266 assert self.topic.watermarks.min >= 0 267 actual_delta = self.topic.watermarks.min - prev_min 268 269 if actual_delta > 0: 270 c.testdrive( 271 SCHEMA + dedent(f""" 272 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta} 273 {{"key": ${{kafka-ingest.iteration}}}} 274 """), 275 mz_service=state.mz_service, 276 )
49class KafkaStart(Action): 50 """Start a Kafka instance.""" 51 52 def provides(self) -> list[Capability]: 53 return [KafkaRunning()] 54 55 def run(self, c: Composition, state: State) -> None: 56 c.up("redpanda")
Start a Kafka instance.
59class KafkaStop(Action): 60 """Stop the Kafka instance.""" 61 62 @classmethod 63 def requires(cls) -> set[type[Capability]]: 64 return {KafkaRunning} 65 66 def withholds(self) -> set[type[Capability]]: 67 return {KafkaRunning} 68 69 def run(self, c: Composition, state: State) -> None: 70 c.kill("redpanda")
Stop the Kafka instance.
Compute the capability classes that this action requires.
73class CreateTopicParameterized(ActionFactory): 74 """Creates a Kafka topic and decides on the envelope that will be used.""" 75 76 @classmethod 77 def requires(cls) -> set[type[Capability]]: 78 return {MzIsRunning, KafkaRunning} 79 80 def __init__( 81 self, 82 max_topics: int = 10, 83 envelopes_with_weights: dict[Envelope, int] = { 84 Envelope.NONE: 25, 85 Envelope.UPSERT: 75, 86 }, 87 ) -> None: 88 self.max_topics = max_topics 89 self.envelopes_with_weights = envelopes_with_weights 90 91 def new(self, capabilities: Capabilities) -> list[Action]: 92 new_topic_name = capabilities.get_free_capability_name( 93 TopicExists, self.max_topics 94 ) 95 96 if new_topic_name: 97 return [ 98 CreateTopic( 99 capabilities=capabilities, 100 topic=TopicExists( 101 name=new_topic_name, 102 envelope=random.choices( 103 list(self.envelopes_with_weights.keys()), 104 weights=list(self.envelopes_with_weights.values()), 105 )[0], 106 partitions=random.randint(1, 10), 107 ), 108 ) 109 ] 110 else: 111 return []
Creates a Kafka topic and decides on the envelope that will be used.
76 @classmethod 77 def requires(cls) -> set[type[Capability]]: 78 return {MzIsRunning, KafkaRunning}
Compute the capability classes that this Action Factory requires.
91 def new(self, capabilities: Capabilities) -> list[Action]: 92 new_topic_name = capabilities.get_free_capability_name( 93 TopicExists, self.max_topics 94 ) 95 96 if new_topic_name: 97 return [ 98 CreateTopic( 99 capabilities=capabilities, 100 topic=TopicExists( 101 name=new_topic_name, 102 envelope=random.choices( 103 list(self.envelopes_with_weights.keys()), 104 weights=list(self.envelopes_with_weights.values()), 105 )[0], 106 partitions=random.randint(1, 10), 107 ), 108 ) 109 ] 110 else: 111 return []
114class CreateTopic(Action): 115 def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None: 116 self.topic = topic 117 super().__init__(capabilities) 118 119 def provides(self) -> list[Capability]: 120 return [self.topic] 121 122 def run(self, c: Composition, state: State) -> None: 123 c.testdrive( 124 SCHEMA + dedent(f""" 125 $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions} 126 $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1 127 {{"key": 0}} {{"f1": 0, "pad": ""}} 128 """), 129 mz_service=state.mz_service, 130 )
Base class for an action that a Zippy test can take.
115 def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None: 116 self.topic = topic 117 super().__init__(capabilities)
Construct a new action, possibly conditioning on the available capabilities.
Compute the capabilities that this action will make available.
122 def run(self, c: Composition, state: State) -> None: 123 c.testdrive( 124 SCHEMA + dedent(f""" 125 $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions} 126 $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1 127 {{"key": 0}} {{"f1": 0, "pad": ""}} 128 """), 129 mz_service=state.mz_service, 130 )
Run this action on the provided composition.
133class Ingest(Action): 134 """Ingests data (inserts, updates or deletions) into a Kafka topic.""" 135 136 @classmethod 137 def requires(cls) -> set[type[Capability]]: 138 return {MzIsRunning, KafkaRunning, TopicExists} 139 140 def __init__(self, capabilities: Capabilities) -> None: 141 self.topic = random.choice(capabilities.get(TopicExists)) 142 self.delta = random.randint(1, 10000) 143 # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes 144 self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice( 145 string.ascii_letters 146 ) 147 super().__init__(capabilities) 148 149 def __str__(self) -> str: 150 return f"{Action.__str__(self)} {self.topic.name}"
Ingests data (inserts, updates or deletions) into a Kafka topic.
140 def __init__(self, capabilities: Capabilities) -> None: 141 self.topic = random.choice(capabilities.get(TopicExists)) 142 self.delta = random.randint(1, 10000) 143 # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes 144 self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice( 145 string.ascii_letters 146 ) 147 super().__init__(capabilities)
Construct a new action, possibly conditioning on the available capabilities.
153class KafkaInsert(Ingest): 154 """Inserts data into a Kafka topic.""" 155 156 def parallel(self) -> bool: 157 return False 158 159 def run(self, c: Composition, state: State) -> None: 160 prev_max = self.topic.watermarks.max 161 self.topic.watermarks.max = prev_max + self.delta 162 assert self.topic.watermarks.max >= 0 163 assert self.topic.watermarks.min >= 0 164 165 testdrive_str = SCHEMA + dedent(f""" 166 $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta} 167 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}} 168 """) 169 170 if self.parallel(): 171 threading.Thread(target=c.testdrive, args=[testdrive_str]).start() 172 else: 173 c.testdrive(testdrive_str, mz_service=state.mz_service)
Inserts data into a Kafka topic.
159 def run(self, c: Composition, state: State) -> None: 160 prev_max = self.topic.watermarks.max 161 self.topic.watermarks.max = prev_max + self.delta 162 assert self.topic.watermarks.max >= 0 163 assert self.topic.watermarks.min >= 0 164 165 testdrive_str = SCHEMA + dedent(f""" 166 $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta} 167 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}} 168 """) 169 170 if self.parallel(): 171 threading.Thread(target=c.testdrive, args=[testdrive_str]).start() 172 else: 173 c.testdrive(testdrive_str, mz_service=state.mz_service)
Run this action on the provided composition.
176class KafkaInsertParallel(KafkaInsert): 177 """Inserts data into a Kafka topic using background threads.""" 178 179 @classmethod 180 def require_explicit_mention(cls) -> bool: 181 return True 182 183 def parallel(self) -> bool: 184 return True
Inserts data into a Kafka topic using background threads.
187class KafkaUpsertFromHead(Ingest): 188 """Updates records from the head in-place by modifying their pad""" 189 190 def run(self, c: Composition, state: State) -> None: 191 if self.topic.envelope is Envelope.NONE: 192 return 193 194 head = self.topic.watermarks.max 195 start = max(head - self.delta, self.topic.watermarks.min) 196 actual_delta = head - start 197 198 if actual_delta > 0: 199 c.testdrive( 200 SCHEMA + dedent(f""" 201 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta} 202 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} 203 """), 204 mz_service=state.mz_service, 205 )
Updates records from the head in-place by modifying their pad
190 def run(self, c: Composition, state: State) -> None: 191 if self.topic.envelope is Envelope.NONE: 192 return 193 194 head = self.topic.watermarks.max 195 start = max(head - self.delta, self.topic.watermarks.min) 196 actual_delta = head - start 197 198 if actual_delta > 0: 199 c.testdrive( 200 SCHEMA + dedent(f""" 201 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta} 202 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} 203 """), 204 mz_service=state.mz_service, 205 )
Run this action on the provided composition.
208class KafkaDeleteFromHead(Ingest): 209 """Deletes the largest values previously inserted.""" 210 211 def run(self, c: Composition, state: State) -> None: 212 if self.topic.envelope is Envelope.NONE: 213 return 214 215 prev_max = self.topic.watermarks.max 216 self.topic.watermarks.max = max( 217 prev_max - self.delta, self.topic.watermarks.min 218 ) 219 assert self.topic.watermarks.max >= 0 220 assert self.topic.watermarks.min >= 0 221 222 actual_delta = prev_max - self.topic.watermarks.max 223 224 if actual_delta > 0: 225 c.testdrive( 226 SCHEMA + dedent(f""" 227 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta} 228 {{"key": ${{kafka-ingest.iteration}}}} 229 """), 230 mz_service=state.mz_service, 231 )
Deletes the largest values previously inserted.
211 def run(self, c: Composition, state: State) -> None: 212 if self.topic.envelope is Envelope.NONE: 213 return 214 215 prev_max = self.topic.watermarks.max 216 self.topic.watermarks.max = max( 217 prev_max - self.delta, self.topic.watermarks.min 218 ) 219 assert self.topic.watermarks.max >= 0 220 assert self.topic.watermarks.min >= 0 221 222 actual_delta = prev_max - self.topic.watermarks.max 223 224 if actual_delta > 0: 225 c.testdrive( 226 SCHEMA + dedent(f""" 227 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta} 228 {{"key": ${{kafka-ingest.iteration}}}} 229 """), 230 mz_service=state.mz_service, 231 )
Run this action on the provided composition.
234class KafkaUpsertFromTail(Ingest): 235 """Updates records from the tail in-place by modifying their pad""" 236 237 def run(self, c: Composition, state: State) -> None: 238 if self.topic.envelope is Envelope.NONE: 239 return 240 241 tail = self.topic.watermarks.min 242 end = min(tail + self.delta, self.topic.watermarks.max) 243 actual_delta = end - tail 244 245 if actual_delta > 0: 246 c.testdrive( 247 SCHEMA + dedent(f""" 248 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta} 249 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} 250 """), 251 mz_service=state.mz_service, 252 )
Updates records from the tail in-place by modifying their pad
237 def run(self, c: Composition, state: State) -> None: 238 if self.topic.envelope is Envelope.NONE: 239 return 240 241 tail = self.topic.watermarks.min 242 end = min(tail + self.delta, self.topic.watermarks.max) 243 actual_delta = end - tail 244 245 if actual_delta > 0: 246 c.testdrive( 247 SCHEMA + dedent(f""" 248 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta} 249 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} 250 """), 251 mz_service=state.mz_service, 252 )
Run this action on the provided composition.
255class KafkaDeleteFromTail(Ingest): 256 """Deletes the smallest values previously inserted.""" 257 258 def run(self, c: Composition, state: State) -> None: 259 if self.topic.envelope is Envelope.NONE: 260 return 261 262 prev_min = self.topic.watermarks.min 263 self.topic.watermarks.min = min( 264 prev_min + self.delta, self.topic.watermarks.max 265 ) 266 assert self.topic.watermarks.max >= 0 267 assert self.topic.watermarks.min >= 0 268 actual_delta = self.topic.watermarks.min - prev_min 269 270 if actual_delta > 0: 271 c.testdrive( 272 SCHEMA + dedent(f""" 273 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta} 274 {{"key": ${{kafka-ingest.iteration}}}} 275 """), 276 mz_service=state.mz_service, 277 )
Deletes the smallest values previously inserted.
258 def run(self, c: Composition, state: State) -> None: 259 if self.topic.envelope is Envelope.NONE: 260 return 261 262 prev_min = self.topic.watermarks.min 263 self.topic.watermarks.min = min( 264 prev_min + self.delta, self.topic.watermarks.max 265 ) 266 assert self.topic.watermarks.max >= 0 267 assert self.topic.watermarks.min >= 0 268 actual_delta = self.topic.watermarks.min - prev_min 269 270 if actual_delta > 0: 271 c.testdrive( 272 SCHEMA + dedent(f""" 273 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta} 274 {{"key": ${{kafka-ingest.iteration}}}} 275 """), 276 mz_service=state.mz_service, 277 )
Run this action on the provided composition.