misc.python.materialize.zippy.table_actions
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10import random 11from textwrap import dedent 12 13from materialize.mzcompose.composition import Composition 14from materialize.zippy.balancerd_capabilities import BalancerdIsRunning 15from materialize.zippy.framework import ( 16 Action, 17 ActionFactory, 18 Capabilities, 19 Capability, 20 State, 21) 22from materialize.zippy.mz_capabilities import MzIsRunning 23from materialize.zippy.table_capabilities import TableExists 24 25MAX_ROWS_PER_ACTION = 10000 26 27 28class CreateTableParameterized(ActionFactory): 29 def __init__( 30 self, max_tables: int = 10, max_rows_per_action: int = MAX_ROWS_PER_ACTION 31 ) -> None: 32 self.max_tables = max_tables 33 self.max_rows_per_action = max_rows_per_action 34 35 @classmethod 36 def requires(cls) -> set[type[Capability]]: 37 return {BalancerdIsRunning, MzIsRunning} 38 39 def new(self, capabilities: Capabilities) -> list[Action]: 40 new_table_name = capabilities.get_free_capability_name( 41 TableExists, self.max_tables 42 ) 43 44 if new_table_name: 45 return [ 46 CreateTable( 47 capabilities=capabilities, 48 table=TableExists( 49 name=new_table_name, 50 has_index=random.choice([True, False]), 51 max_rows_per_action=self.max_rows_per_action, 52 ), 53 ) 54 ] 55 else: 56 return [] 57 58 59class CreateTable(Action): 60 """Creates a table on the Mz instance. 50% of the tables have a default index.""" 61 62 @classmethod 63 def requires(cls) -> set[type[Capability]]: 64 return {BalancerdIsRunning, MzIsRunning} 65 66 def __init__(self, table: TableExists, capabilities: Capabilities) -> None: 67 assert ( 68 table is not None 69 ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory" 70 self.table = table 71 super().__init__(capabilities) 72 73 def run(self, c: Composition, state: State) -> None: 74 index = ( 75 f"> CREATE DEFAULT INDEX ON {self.table.name}" 76 if self.table.has_index 77 else "" 78 ) 79 c.testdrive( 80 dedent( 81 f""" 82 > CREATE TABLE {self.table.name} (f1 INTEGER); 83 {index} 84 > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max}); 85 """ 86 ), 87 mz_service=state.mz_service, 88 ) 89 90 def provides(self) -> list[Capability]: 91 return [self.table] 92 93 94class ValidateTable(Action): 95 """Validates that a single table contains data that is consistent with the expected min/max watermark.""" 96 97 @classmethod 98 def requires(cls) -> set[type[Capability]]: 99 return {BalancerdIsRunning, MzIsRunning, TableExists} 100 101 def __init__( 102 self, capabilities: Capabilities, table: TableExists | None = None 103 ) -> None: 104 if table is not None: 105 self.table = table 106 else: 107 self.table = random.choice(capabilities.get(TableExists)) 108 109 self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0] 110 super().__init__(capabilities) 111 112 def run(self, c: Composition, state: State) -> None: 113 # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table 114 # Therefore, only use it in 20% of validations. 115 if self.select_limit: 116 c.testdrive( 117 dedent( 118 f""" 119 > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER); 120 > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999; 121 > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit; 122 {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} 123 > DROP TABLE {self.table.name}_select_limit 124 """ 125 ), 126 mz_service=state.mz_service, 127 ) 128 else: 129 c.testdrive( 130 dedent( 131 f""" 132 > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}; 133 {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} 134 """ 135 ), 136 mz_service=state.mz_service, 137 ) 138 139 140class DML(Action): 141 """Performs an INSERT, DELETE or UPDATE against a table.""" 142 143 @classmethod 144 def requires(cls) -> set[type[Capability]]: 145 return {BalancerdIsRunning, MzIsRunning, TableExists} 146 147 def __init__(self, capabilities: Capabilities) -> None: 148 self.table = random.choice(capabilities.get(TableExists)) 149 self.delta = random.randint(1, self.table.max_rows_per_action) 150 super().__init__(capabilities) 151 152 def __str__(self) -> str: 153 return f"{Action.__str__(self)} {self.table.name}" 154 155 156class Insert(DML): 157 """Inserts rows into a table.""" 158 159 def run(self, c: Composition, state: State) -> None: 160 prev_max = self.table.watermarks.max 161 self.table.watermarks.max = prev_max + self.delta 162 c.testdrive( 163 f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});", 164 mz_service=state.mz_service, 165 ) 166 167 168class ShiftForward(DML): 169 """Update all rows from a table by incrementing their values by a constant.""" 170 171 def run(self, c: Composition, state: State) -> None: 172 self.table.watermarks.shift(self.delta) 173 c.testdrive( 174 f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};", 175 mz_service=state.mz_service, 176 ) 177 178 179class ShiftBackward(DML): 180 """Update all rows from a table by decrementing their values by a constant.""" 181 182 def run(self, c: Composition, state: State) -> None: 183 self.table.watermarks.shift(-self.delta) 184 c.testdrive( 185 f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};", 186 mz_service=state.mz_service, 187 ) 188 189 190class DeleteFromHead(DML): 191 """Delete the largest values from a table""" 192 193 def run(self, c: Composition, state: State) -> None: 194 self.table.watermarks.max = max( 195 self.table.watermarks.max - self.delta, self.table.watermarks.min 196 ) 197 c.testdrive( 198 f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};", 199 mz_service=state.mz_service, 200 ) 201 202 203class DeleteFromTail(DML): 204 """Delete the smallest values from a table""" 205 206 def run(self, c: Composition, state: State) -> None: 207 self.table.watermarks.min = min( 208 self.table.watermarks.min + self.delta, self.table.watermarks.max 209 ) 210 c.testdrive( 211 f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};", 212 mz_service=state.mz_service, 213 )
29class CreateTableParameterized(ActionFactory): 30 def __init__( 31 self, max_tables: int = 10, max_rows_per_action: int = MAX_ROWS_PER_ACTION 32 ) -> None: 33 self.max_tables = max_tables 34 self.max_rows_per_action = max_rows_per_action 35 36 @classmethod 37 def requires(cls) -> set[type[Capability]]: 38 return {BalancerdIsRunning, MzIsRunning} 39 40 def new(self, capabilities: Capabilities) -> list[Action]: 41 new_table_name = capabilities.get_free_capability_name( 42 TableExists, self.max_tables 43 ) 44 45 if new_table_name: 46 return [ 47 CreateTable( 48 capabilities=capabilities, 49 table=TableExists( 50 name=new_table_name, 51 has_index=random.choice([True, False]), 52 max_rows_per_action=self.max_rows_per_action, 53 ), 54 ) 55 ] 56 else: 57 return []
Base class for Action Factories that return parameterized Actions to execute.
36 @classmethod 37 def requires(cls) -> set[type[Capability]]: 38 return {BalancerdIsRunning, MzIsRunning}
Compute the capability classes that this Action Factory requires.
40 def new(self, capabilities: Capabilities) -> list[Action]: 41 new_table_name = capabilities.get_free_capability_name( 42 TableExists, self.max_tables 43 ) 44 45 if new_table_name: 46 return [ 47 CreateTable( 48 capabilities=capabilities, 49 table=TableExists( 50 name=new_table_name, 51 has_index=random.choice([True, False]), 52 max_rows_per_action=self.max_rows_per_action, 53 ), 54 ) 55 ] 56 else: 57 return []
60class CreateTable(Action): 61 """Creates a table on the Mz instance. 50% of the tables have a default index.""" 62 63 @classmethod 64 def requires(cls) -> set[type[Capability]]: 65 return {BalancerdIsRunning, MzIsRunning} 66 67 def __init__(self, table: TableExists, capabilities: Capabilities) -> None: 68 assert ( 69 table is not None 70 ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory" 71 self.table = table 72 super().__init__(capabilities) 73 74 def run(self, c: Composition, state: State) -> None: 75 index = ( 76 f"> CREATE DEFAULT INDEX ON {self.table.name}" 77 if self.table.has_index 78 else "" 79 ) 80 c.testdrive( 81 dedent( 82 f""" 83 > CREATE TABLE {self.table.name} (f1 INTEGER); 84 {index} 85 > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max}); 86 """ 87 ), 88 mz_service=state.mz_service, 89 ) 90 91 def provides(self) -> list[Capability]: 92 return [self.table]
Creates a table on the Mz instance. 50% of the tables have a default index.
67 def __init__(self, table: TableExists, capabilities: Capabilities) -> None: 68 assert ( 69 table is not None 70 ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory" 71 self.table = table 72 super().__init__(capabilities)
Construct a new action, possibly conditioning on the available capabilities.
63 @classmethod 64 def requires(cls) -> set[type[Capability]]: 65 return {BalancerdIsRunning, MzIsRunning}
Compute the capability classes that this action requires.
74 def run(self, c: Composition, state: State) -> None: 75 index = ( 76 f"> CREATE DEFAULT INDEX ON {self.table.name}" 77 if self.table.has_index 78 else "" 79 ) 80 c.testdrive( 81 dedent( 82 f""" 83 > CREATE TABLE {self.table.name} (f1 INTEGER); 84 {index} 85 > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max}); 86 """ 87 ), 88 mz_service=state.mz_service, 89 )
Run this action on the provided composition.
95class ValidateTable(Action): 96 """Validates that a single table contains data that is consistent with the expected min/max watermark.""" 97 98 @classmethod 99 def requires(cls) -> set[type[Capability]]: 100 return {BalancerdIsRunning, MzIsRunning, TableExists} 101 102 def __init__( 103 self, capabilities: Capabilities, table: TableExists | None = None 104 ) -> None: 105 if table is not None: 106 self.table = table 107 else: 108 self.table = random.choice(capabilities.get(TableExists)) 109 110 self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0] 111 super().__init__(capabilities) 112 113 def run(self, c: Composition, state: State) -> None: 114 # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table 115 # Therefore, only use it in 20% of validations. 116 if self.select_limit: 117 c.testdrive( 118 dedent( 119 f""" 120 > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER); 121 > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999; 122 > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit; 123 {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} 124 > DROP TABLE {self.table.name}_select_limit 125 """ 126 ), 127 mz_service=state.mz_service, 128 ) 129 else: 130 c.testdrive( 131 dedent( 132 f""" 133 > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}; 134 {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} 135 """ 136 ), 137 mz_service=state.mz_service, 138 )
Validates that a single table contains data that is consistent with the expected min/max watermark.
102 def __init__( 103 self, capabilities: Capabilities, table: TableExists | None = None 104 ) -> None: 105 if table is not None: 106 self.table = table 107 else: 108 self.table = random.choice(capabilities.get(TableExists)) 109 110 self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0] 111 super().__init__(capabilities)
Construct a new action, possibly conditioning on the available capabilities.
98 @classmethod 99 def requires(cls) -> set[type[Capability]]: 100 return {BalancerdIsRunning, MzIsRunning, TableExists}
Compute the capability classes that this action requires.
113 def run(self, c: Composition, state: State) -> None: 114 # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table 115 # Therefore, only use it in 20% of validations. 116 if self.select_limit: 117 c.testdrive( 118 dedent( 119 f""" 120 > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER); 121 > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999; 122 > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit; 123 {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} 124 > DROP TABLE {self.table.name}_select_limit 125 """ 126 ), 127 mz_service=state.mz_service, 128 ) 129 else: 130 c.testdrive( 131 dedent( 132 f""" 133 > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}; 134 {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} 135 """ 136 ), 137 mz_service=state.mz_service, 138 )
Run this action on the provided composition.
141class DML(Action): 142 """Performs an INSERT, DELETE or UPDATE against a table.""" 143 144 @classmethod 145 def requires(cls) -> set[type[Capability]]: 146 return {BalancerdIsRunning, MzIsRunning, TableExists} 147 148 def __init__(self, capabilities: Capabilities) -> None: 149 self.table = random.choice(capabilities.get(TableExists)) 150 self.delta = random.randint(1, self.table.max_rows_per_action) 151 super().__init__(capabilities) 152 153 def __str__(self) -> str: 154 return f"{Action.__str__(self)} {self.table.name}"
Performs an INSERT, DELETE or UPDATE against a table.
148 def __init__(self, capabilities: Capabilities) -> None: 149 self.table = random.choice(capabilities.get(TableExists)) 150 self.delta = random.randint(1, self.table.max_rows_per_action) 151 super().__init__(capabilities)
Construct a new action, possibly conditioning on the available capabilities.
157class Insert(DML): 158 """Inserts rows into a table.""" 159 160 def run(self, c: Composition, state: State) -> None: 161 prev_max = self.table.watermarks.max 162 self.table.watermarks.max = prev_max + self.delta 163 c.testdrive( 164 f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});", 165 mz_service=state.mz_service, 166 )
Inserts rows into a table.
160 def run(self, c: Composition, state: State) -> None: 161 prev_max = self.table.watermarks.max 162 self.table.watermarks.max = prev_max + self.delta 163 c.testdrive( 164 f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});", 165 mz_service=state.mz_service, 166 )
Run this action on the provided composition.
169class ShiftForward(DML): 170 """Update all rows from a table by incrementing their values by a constant.""" 171 172 def run(self, c: Composition, state: State) -> None: 173 self.table.watermarks.shift(self.delta) 174 c.testdrive( 175 f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};", 176 mz_service=state.mz_service, 177 )
Update all rows from a table by incrementing their values by a constant.
172 def run(self, c: Composition, state: State) -> None: 173 self.table.watermarks.shift(self.delta) 174 c.testdrive( 175 f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};", 176 mz_service=state.mz_service, 177 )
Run this action on the provided composition.
180class ShiftBackward(DML): 181 """Update all rows from a table by decrementing their values by a constant.""" 182 183 def run(self, c: Composition, state: State) -> None: 184 self.table.watermarks.shift(-self.delta) 185 c.testdrive( 186 f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};", 187 mz_service=state.mz_service, 188 )
Update all rows from a table by decrementing their values by a constant.
183 def run(self, c: Composition, state: State) -> None: 184 self.table.watermarks.shift(-self.delta) 185 c.testdrive( 186 f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};", 187 mz_service=state.mz_service, 188 )
Run this action on the provided composition.
191class DeleteFromHead(DML): 192 """Delete the largest values from a table""" 193 194 def run(self, c: Composition, state: State) -> None: 195 self.table.watermarks.max = max( 196 self.table.watermarks.max - self.delta, self.table.watermarks.min 197 ) 198 c.testdrive( 199 f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};", 200 mz_service=state.mz_service, 201 )
Delete the largest values from a table
194 def run(self, c: Composition, state: State) -> None: 195 self.table.watermarks.max = max( 196 self.table.watermarks.max - self.delta, self.table.watermarks.min 197 ) 198 c.testdrive( 199 f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};", 200 mz_service=state.mz_service, 201 )
Run this action on the provided composition.
204class DeleteFromTail(DML): 205 """Delete the smallest values from a table""" 206 207 def run(self, c: Composition, state: State) -> None: 208 self.table.watermarks.min = min( 209 self.table.watermarks.min + self.delta, self.table.watermarks.max 210 ) 211 c.testdrive( 212 f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};", 213 mz_service=state.mz_service, 214 )
Delete the smallest values from a table
207 def run(self, c: Composition, state: State) -> None: 208 self.table.watermarks.min = min( 209 self.table.watermarks.min + self.delta, self.table.watermarks.max 210 ) 211 c.testdrive( 212 f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};", 213 mz_service=state.mz_service, 214 )
Run this action on the provided composition.