misc.python.materialize.zippy.kafka_actions
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10import random 11import string 12import threading 13from textwrap import dedent 14 15import numpy as np 16 17from materialize.mzcompose.composition import Composition 18from materialize.zippy.framework import ( 19 Action, 20 ActionFactory, 21 Capabilities, 22 Capability, 23 State, 24) 25from materialize.zippy.kafka_capabilities import Envelope, KafkaRunning, TopicExists 26from materialize.zippy.mz_capabilities import MzIsRunning 27 28SCHEMA = """ 29$ set keyschema={ 30 "type" : "record", 31 "name" : "test", 32 "fields" : [ 33 {"name":"key", "type":"long"} 34 ] 35 } 36 37$ set schema={ 38 "type" : "record", 39 "name" : "test", 40 "fields" : [ 41 {"name":"f1", "type":"long"}, 42 {"name":"pad", "type":"string"} 43 ] 44 } 45""" 46 47 48class KafkaStart(Action): 49 """Start a Kafka instance.""" 50 51 def provides(self) -> list[Capability]: 52 return [KafkaRunning()] 53 54 def run(self, c: Composition, state: State) -> None: 55 c.up("redpanda") 56 57 58class KafkaStop(Action): 59 """Stop the Kafka instance.""" 60 61 @classmethod 62 def requires(cls) -> set[type[Capability]]: 63 return {KafkaRunning} 64 65 def withholds(self) -> set[type[Capability]]: 66 return {KafkaRunning} 67 68 def run(self, c: Composition, state: State) -> None: 69 c.kill("redpanda") 70 71 72class CreateTopicParameterized(ActionFactory): 73 """Creates a Kafka topic and decides on the envelope that will be used.""" 74 75 @classmethod 76 def requires(cls) -> set[type[Capability]]: 77 return {MzIsRunning, KafkaRunning} 78 79 def __init__( 80 self, 81 max_topics: int = 10, 82 envelopes_with_weights: dict[Envelope, int] = { 83 Envelope.NONE: 25, 84 Envelope.UPSERT: 75, 85 }, 86 ) -> None: 87 self.max_topics = max_topics 88 self.envelopes_with_weights = envelopes_with_weights 89 90 def new(self, capabilities: Capabilities) -> list[Action]: 91 new_topic_name = capabilities.get_free_capability_name( 92 TopicExists, self.max_topics 93 ) 94 95 if new_topic_name: 96 return [ 97 CreateTopic( 98 capabilities=capabilities, 99 topic=TopicExists( 100 name=new_topic_name, 101 envelope=random.choices( 102 list(self.envelopes_with_weights.keys()), 103 weights=list(self.envelopes_with_weights.values()), 104 )[0], 105 partitions=random.randint(1, 10), 106 ), 107 ) 108 ] 109 else: 110 return [] 111 112 113class CreateTopic(Action): 114 def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None: 115 self.topic = topic 116 super().__init__(capabilities) 117 118 def provides(self) -> list[Capability]: 119 return [self.topic] 120 121 def run(self, c: Composition, state: State) -> None: 122 c.testdrive( 123 SCHEMA 124 + dedent( 125 f""" 126 $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions} 127 $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1 128 {{"key": 0}} {{"f1": 0, "pad": ""}} 129 """ 130 ), 131 mz_service=state.mz_service, 132 ) 133 134 135class Ingest(Action): 136 """Ingests data (inserts, updates or deletions) into a Kafka topic.""" 137 138 @classmethod 139 def requires(cls) -> set[type[Capability]]: 140 return {MzIsRunning, KafkaRunning, TopicExists} 141 142 def __init__(self, capabilities: Capabilities) -> None: 143 self.topic = random.choice(capabilities.get(TopicExists)) 144 self.delta = random.randint(1, 10000) 145 # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes 146 self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice( 147 string.ascii_letters 148 ) 149 super().__init__(capabilities) 150 151 def __str__(self) -> str: 152 return f"{Action.__str__(self)} {self.topic.name}" 153 154 155class KafkaInsert(Ingest): 156 """Inserts data into a Kafka topic.""" 157 158 def parallel(self) -> bool: 159 return False 160 161 def run(self, c: Composition, state: State) -> None: 162 prev_max = self.topic.watermarks.max 163 self.topic.watermarks.max = prev_max + self.delta 164 assert self.topic.watermarks.max >= 0 165 assert self.topic.watermarks.min >= 0 166 167 testdrive_str = SCHEMA + dedent( 168 f""" 169 $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta} 170 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}} 171 """ 172 ) 173 174 if self.parallel(): 175 threading.Thread(target=c.testdrive, args=[testdrive_str]).start() 176 else: 177 c.testdrive(testdrive_str, mz_service=state.mz_service) 178 179 180class KafkaInsertParallel(KafkaInsert): 181 """Inserts data into a Kafka topic using background threads.""" 182 183 @classmethod 184 def require_explicit_mention(cls) -> bool: 185 return True 186 187 def parallel(self) -> bool: 188 return True 189 190 191class KafkaUpsertFromHead(Ingest): 192 """Updates records from the head in-place by modifying their pad""" 193 194 def run(self, c: Composition, state: State) -> None: 195 if self.topic.envelope is Envelope.NONE: 196 return 197 198 head = self.topic.watermarks.max 199 start = max(head - self.delta, self.topic.watermarks.min) 200 actual_delta = head - start 201 202 if actual_delta > 0: 203 c.testdrive( 204 SCHEMA 205 + dedent( 206 f""" 207 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta} 208 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} 209 """ 210 ), 211 mz_service=state.mz_service, 212 ) 213 214 215class KafkaDeleteFromHead(Ingest): 216 """Deletes the largest values previously inserted.""" 217 218 def run(self, c: Composition, state: State) -> None: 219 if self.topic.envelope is Envelope.NONE: 220 return 221 222 prev_max = self.topic.watermarks.max 223 self.topic.watermarks.max = max( 224 prev_max - self.delta, self.topic.watermarks.min 225 ) 226 assert self.topic.watermarks.max >= 0 227 assert self.topic.watermarks.min >= 0 228 229 actual_delta = prev_max - self.topic.watermarks.max 230 231 if actual_delta > 0: 232 c.testdrive( 233 SCHEMA 234 + dedent( 235 f""" 236 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta} 237 {{"key": ${{kafka-ingest.iteration}}}} 238 """ 239 ), 240 mz_service=state.mz_service, 241 ) 242 243 244class KafkaUpsertFromTail(Ingest): 245 """Updates records from the tail in-place by modifying their pad""" 246 247 def run(self, c: Composition, state: State) -> None: 248 if self.topic.envelope is Envelope.NONE: 249 return 250 251 tail = self.topic.watermarks.min 252 end = min(tail + self.delta, self.topic.watermarks.max) 253 actual_delta = end - tail 254 255 if actual_delta > 0: 256 c.testdrive( 257 SCHEMA 258 + dedent( 259 f""" 260 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta} 261 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} 262 """ 263 ), 264 mz_service=state.mz_service, 265 ) 266 267 268class KafkaDeleteFromTail(Ingest): 269 """Deletes the smallest values previously inserted.""" 270 271 def run(self, c: Composition, state: State) -> None: 272 if self.topic.envelope is Envelope.NONE: 273 return 274 275 prev_min = self.topic.watermarks.min 276 self.topic.watermarks.min = min( 277 prev_min + self.delta, self.topic.watermarks.max 278 ) 279 assert self.topic.watermarks.max >= 0 280 assert self.topic.watermarks.min >= 0 281 actual_delta = self.topic.watermarks.min - prev_min 282 283 if actual_delta > 0: 284 c.testdrive( 285 SCHEMA 286 + dedent( 287 f""" 288 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta} 289 {{"key": ${{kafka-ingest.iteration}}}} 290 """ 291 ), 292 mz_service=state.mz_service, 293 )
49class KafkaStart(Action): 50 """Start a Kafka instance.""" 51 52 def provides(self) -> list[Capability]: 53 return [KafkaRunning()] 54 55 def run(self, c: Composition, state: State) -> None: 56 c.up("redpanda")
Start a Kafka instance.
59class KafkaStop(Action): 60 """Stop the Kafka instance.""" 61 62 @classmethod 63 def requires(cls) -> set[type[Capability]]: 64 return {KafkaRunning} 65 66 def withholds(self) -> set[type[Capability]]: 67 return {KafkaRunning} 68 69 def run(self, c: Composition, state: State) -> None: 70 c.kill("redpanda")
Stop the Kafka instance.
Compute the capability classes that this action requires.
73class CreateTopicParameterized(ActionFactory): 74 """Creates a Kafka topic and decides on the envelope that will be used.""" 75 76 @classmethod 77 def requires(cls) -> set[type[Capability]]: 78 return {MzIsRunning, KafkaRunning} 79 80 def __init__( 81 self, 82 max_topics: int = 10, 83 envelopes_with_weights: dict[Envelope, int] = { 84 Envelope.NONE: 25, 85 Envelope.UPSERT: 75, 86 }, 87 ) -> None: 88 self.max_topics = max_topics 89 self.envelopes_with_weights = envelopes_with_weights 90 91 def new(self, capabilities: Capabilities) -> list[Action]: 92 new_topic_name = capabilities.get_free_capability_name( 93 TopicExists, self.max_topics 94 ) 95 96 if new_topic_name: 97 return [ 98 CreateTopic( 99 capabilities=capabilities, 100 topic=TopicExists( 101 name=new_topic_name, 102 envelope=random.choices( 103 list(self.envelopes_with_weights.keys()), 104 weights=list(self.envelopes_with_weights.values()), 105 )[0], 106 partitions=random.randint(1, 10), 107 ), 108 ) 109 ] 110 else: 111 return []
Creates a Kafka topic and decides on the envelope that will be used.
76 @classmethod 77 def requires(cls) -> set[type[Capability]]: 78 return {MzIsRunning, KafkaRunning}
Compute the capability classes that this Action Factory requires.
91 def new(self, capabilities: Capabilities) -> list[Action]: 92 new_topic_name = capabilities.get_free_capability_name( 93 TopicExists, self.max_topics 94 ) 95 96 if new_topic_name: 97 return [ 98 CreateTopic( 99 capabilities=capabilities, 100 topic=TopicExists( 101 name=new_topic_name, 102 envelope=random.choices( 103 list(self.envelopes_with_weights.keys()), 104 weights=list(self.envelopes_with_weights.values()), 105 )[0], 106 partitions=random.randint(1, 10), 107 ), 108 ) 109 ] 110 else: 111 return []
114class CreateTopic(Action): 115 def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None: 116 self.topic = topic 117 super().__init__(capabilities) 118 119 def provides(self) -> list[Capability]: 120 return [self.topic] 121 122 def run(self, c: Composition, state: State) -> None: 123 c.testdrive( 124 SCHEMA 125 + dedent( 126 f""" 127 $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions} 128 $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1 129 {{"key": 0}} {{"f1": 0, "pad": ""}} 130 """ 131 ), 132 mz_service=state.mz_service, 133 )
Base class for an action that a Zippy test can take.
115 def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None: 116 self.topic = topic 117 super().__init__(capabilities)
Construct a new action, possibly conditioning on the available capabilities.
Compute the capabilities that this action will make available.
122 def run(self, c: Composition, state: State) -> None: 123 c.testdrive( 124 SCHEMA 125 + dedent( 126 f""" 127 $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions} 128 $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1 129 {{"key": 0}} {{"f1": 0, "pad": ""}} 130 """ 131 ), 132 mz_service=state.mz_service, 133 )
Run this action on the provided composition.
136class Ingest(Action): 137 """Ingests data (inserts, updates or deletions) into a Kafka topic.""" 138 139 @classmethod 140 def requires(cls) -> set[type[Capability]]: 141 return {MzIsRunning, KafkaRunning, TopicExists} 142 143 def __init__(self, capabilities: Capabilities) -> None: 144 self.topic = random.choice(capabilities.get(TopicExists)) 145 self.delta = random.randint(1, 10000) 146 # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes 147 self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice( 148 string.ascii_letters 149 ) 150 super().__init__(capabilities) 151 152 def __str__(self) -> str: 153 return f"{Action.__str__(self)} {self.topic.name}"
Ingests data (inserts, updates or deletions) into a Kafka topic.
143 def __init__(self, capabilities: Capabilities) -> None: 144 self.topic = random.choice(capabilities.get(TopicExists)) 145 self.delta = random.randint(1, 10000) 146 # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes 147 self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice( 148 string.ascii_letters 149 ) 150 super().__init__(capabilities)
Construct a new action, possibly conditioning on the available capabilities.
156class KafkaInsert(Ingest): 157 """Inserts data into a Kafka topic.""" 158 159 def parallel(self) -> bool: 160 return False 161 162 def run(self, c: Composition, state: State) -> None: 163 prev_max = self.topic.watermarks.max 164 self.topic.watermarks.max = prev_max + self.delta 165 assert self.topic.watermarks.max >= 0 166 assert self.topic.watermarks.min >= 0 167 168 testdrive_str = SCHEMA + dedent( 169 f""" 170 $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta} 171 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}} 172 """ 173 ) 174 175 if self.parallel(): 176 threading.Thread(target=c.testdrive, args=[testdrive_str]).start() 177 else: 178 c.testdrive(testdrive_str, mz_service=state.mz_service)
Inserts data into a Kafka topic.
162 def run(self, c: Composition, state: State) -> None: 163 prev_max = self.topic.watermarks.max 164 self.topic.watermarks.max = prev_max + self.delta 165 assert self.topic.watermarks.max >= 0 166 assert self.topic.watermarks.min >= 0 167 168 testdrive_str = SCHEMA + dedent( 169 f""" 170 $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta} 171 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}} 172 """ 173 ) 174 175 if self.parallel(): 176 threading.Thread(target=c.testdrive, args=[testdrive_str]).start() 177 else: 178 c.testdrive(testdrive_str, mz_service=state.mz_service)
Run this action on the provided composition.
181class KafkaInsertParallel(KafkaInsert): 182 """Inserts data into a Kafka topic using background threads.""" 183 184 @classmethod 185 def require_explicit_mention(cls) -> bool: 186 return True 187 188 def parallel(self) -> bool: 189 return True
Inserts data into a Kafka topic using background threads.
192class KafkaUpsertFromHead(Ingest): 193 """Updates records from the head in-place by modifying their pad""" 194 195 def run(self, c: Composition, state: State) -> None: 196 if self.topic.envelope is Envelope.NONE: 197 return 198 199 head = self.topic.watermarks.max 200 start = max(head - self.delta, self.topic.watermarks.min) 201 actual_delta = head - start 202 203 if actual_delta > 0: 204 c.testdrive( 205 SCHEMA 206 + dedent( 207 f""" 208 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta} 209 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} 210 """ 211 ), 212 mz_service=state.mz_service, 213 )
Updates records from the head in-place by modifying their pad
195 def run(self, c: Composition, state: State) -> None: 196 if self.topic.envelope is Envelope.NONE: 197 return 198 199 head = self.topic.watermarks.max 200 start = max(head - self.delta, self.topic.watermarks.min) 201 actual_delta = head - start 202 203 if actual_delta > 0: 204 c.testdrive( 205 SCHEMA 206 + dedent( 207 f""" 208 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta} 209 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} 210 """ 211 ), 212 mz_service=state.mz_service, 213 )
Run this action on the provided composition.
216class KafkaDeleteFromHead(Ingest): 217 """Deletes the largest values previously inserted.""" 218 219 def run(self, c: Composition, state: State) -> None: 220 if self.topic.envelope is Envelope.NONE: 221 return 222 223 prev_max = self.topic.watermarks.max 224 self.topic.watermarks.max = max( 225 prev_max - self.delta, self.topic.watermarks.min 226 ) 227 assert self.topic.watermarks.max >= 0 228 assert self.topic.watermarks.min >= 0 229 230 actual_delta = prev_max - self.topic.watermarks.max 231 232 if actual_delta > 0: 233 c.testdrive( 234 SCHEMA 235 + dedent( 236 f""" 237 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta} 238 {{"key": ${{kafka-ingest.iteration}}}} 239 """ 240 ), 241 mz_service=state.mz_service, 242 )
Deletes the largest values previously inserted.
219 def run(self, c: Composition, state: State) -> None: 220 if self.topic.envelope is Envelope.NONE: 221 return 222 223 prev_max = self.topic.watermarks.max 224 self.topic.watermarks.max = max( 225 prev_max - self.delta, self.topic.watermarks.min 226 ) 227 assert self.topic.watermarks.max >= 0 228 assert self.topic.watermarks.min >= 0 229 230 actual_delta = prev_max - self.topic.watermarks.max 231 232 if actual_delta > 0: 233 c.testdrive( 234 SCHEMA 235 + dedent( 236 f""" 237 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta} 238 {{"key": ${{kafka-ingest.iteration}}}} 239 """ 240 ), 241 mz_service=state.mz_service, 242 )
Run this action on the provided composition.
245class KafkaUpsertFromTail(Ingest): 246 """Updates records from the tail in-place by modifying their pad""" 247 248 def run(self, c: Composition, state: State) -> None: 249 if self.topic.envelope is Envelope.NONE: 250 return 251 252 tail = self.topic.watermarks.min 253 end = min(tail + self.delta, self.topic.watermarks.max) 254 actual_delta = end - tail 255 256 if actual_delta > 0: 257 c.testdrive( 258 SCHEMA 259 + dedent( 260 f""" 261 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta} 262 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} 263 """ 264 ), 265 mz_service=state.mz_service, 266 )
Updates records from the tail in-place by modifying their pad
248 def run(self, c: Composition, state: State) -> None: 249 if self.topic.envelope is Envelope.NONE: 250 return 251 252 tail = self.topic.watermarks.min 253 end = min(tail + self.delta, self.topic.watermarks.max) 254 actual_delta = end - tail 255 256 if actual_delta > 0: 257 c.testdrive( 258 SCHEMA 259 + dedent( 260 f""" 261 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta} 262 {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} 263 """ 264 ), 265 mz_service=state.mz_service, 266 )
Run this action on the provided composition.
269class KafkaDeleteFromTail(Ingest): 270 """Deletes the smallest values previously inserted.""" 271 272 def run(self, c: Composition, state: State) -> None: 273 if self.topic.envelope is Envelope.NONE: 274 return 275 276 prev_min = self.topic.watermarks.min 277 self.topic.watermarks.min = min( 278 prev_min + self.delta, self.topic.watermarks.max 279 ) 280 assert self.topic.watermarks.max >= 0 281 assert self.topic.watermarks.min >= 0 282 actual_delta = self.topic.watermarks.min - prev_min 283 284 if actual_delta > 0: 285 c.testdrive( 286 SCHEMA 287 + dedent( 288 f""" 289 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta} 290 {{"key": ${{kafka-ingest.iteration}}}} 291 """ 292 ), 293 mz_service=state.mz_service, 294 )
Deletes the smallest values previously inserted.
272 def run(self, c: Composition, state: State) -> None: 273 if self.topic.envelope is Envelope.NONE: 274 return 275 276 prev_min = self.topic.watermarks.min 277 self.topic.watermarks.min = min( 278 prev_min + self.delta, self.topic.watermarks.max 279 ) 280 assert self.topic.watermarks.max >= 0 281 assert self.topic.watermarks.min >= 0 282 actual_delta = self.topic.watermarks.min - prev_min 283 284 if actual_delta > 0: 285 c.testdrive( 286 SCHEMA 287 + dedent( 288 f""" 289 $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta} 290 {{"key": ${{kafka-ingest.iteration}}}} 291 """ 292 ), 293 mz_service=state.mz_service, 294 )
Run this action on the provided composition.