misc.python.materialize.zippy.table_actions
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10import random 11from textwrap import dedent 12 13from materialize.mzcompose.composition import Composition 14from materialize.zippy.balancerd_capabilities import BalancerdIsRunning 15from materialize.zippy.framework import ( 16 Action, 17 ActionFactory, 18 Capabilities, 19 Capability, 20 State, 21) 22from materialize.zippy.mz_capabilities import MzIsRunning 23from materialize.zippy.table_capabilities import TableExists 24 25MAX_ROWS_PER_ACTION = 10000 26 27 28class CreateTableParameterized(ActionFactory): 29 def __init__( 30 self, max_tables: int = 10, max_rows_per_action: int = MAX_ROWS_PER_ACTION 31 ) -> None: 32 self.max_tables = max_tables 33 self.max_rows_per_action = max_rows_per_action 34 35 @classmethod 36 def requires(cls) -> set[type[Capability]]: 37 return {BalancerdIsRunning, MzIsRunning} 38 39 def new(self, capabilities: Capabilities) -> list[Action]: 40 new_table_name = capabilities.get_free_capability_name( 41 TableExists, self.max_tables 42 ) 43 44 if new_table_name: 45 return [ 46 CreateTable( 47 capabilities=capabilities, 48 table=TableExists( 49 name=new_table_name, 50 has_index=random.choice([True, False]), 51 max_rows_per_action=self.max_rows_per_action, 52 ), 53 ) 54 ] 55 else: 56 return [] 57 58 59class CreateTable(Action): 60 """Creates a table on the Mz instance. 50% of the tables have a default index.""" 61 62 @classmethod 63 def requires(cls) -> set[type[Capability]]: 64 return {BalancerdIsRunning, MzIsRunning} 65 66 def __init__(self, table: TableExists, capabilities: Capabilities) -> None: 67 assert ( 68 table is not None 69 ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory" 70 self.table = table 71 super().__init__(capabilities) 72 73 def run(self, c: Composition, state: State) -> None: 74 index = ( 75 f"> CREATE DEFAULT INDEX ON {self.table.name}" 76 if self.table.has_index 77 else "" 78 ) 79 c.testdrive( 80 dedent(f""" 81 > CREATE TABLE {self.table.name} (f1 INTEGER); 82 {index} 83 > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max}); 84 """), 85 mz_service=state.mz_service, 86 ) 87 88 def provides(self) -> list[Capability]: 89 return [self.table] 90 91 92class ValidateTable(Action): 93 """Validates that a single table contains data that is consistent with the expected min/max watermark.""" 94 95 @classmethod 96 def requires(cls) -> set[type[Capability]]: 97 return {BalancerdIsRunning, MzIsRunning, TableExists} 98 99 def __init__( 100 self, capabilities: Capabilities, table: TableExists | None = None 101 ) -> None: 102 if table is not None: 103 self.table = table 104 else: 105 self.table = random.choice(capabilities.get(TableExists)) 106 107 self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0] 108 super().__init__(capabilities) 109 110 def run(self, c: Composition, state: State) -> None: 111 # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table 112 # Therefore, only use it in 20% of validations. 113 if self.select_limit: 114 c.testdrive( 115 dedent(f""" 116 > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER); 117 > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999; 118 > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit; 119 {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} 120 > DROP TABLE {self.table.name}_select_limit 121 """), 122 mz_service=state.mz_service, 123 ) 124 else: 125 c.testdrive( 126 dedent(f""" 127 > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}; 128 {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} 129 """), 130 mz_service=state.mz_service, 131 ) 132 133 134class DML(Action): 135 """Performs an INSERT, DELETE or UPDATE against a table.""" 136 137 @classmethod 138 def requires(cls) -> set[type[Capability]]: 139 return {BalancerdIsRunning, MzIsRunning, TableExists} 140 141 def __init__(self, capabilities: Capabilities) -> None: 142 self.table = random.choice(capabilities.get(TableExists)) 143 self.delta = random.randint(1, self.table.max_rows_per_action) 144 super().__init__(capabilities) 145 146 def __str__(self) -> str: 147 return f"{Action.__str__(self)} {self.table.name}" 148 149 150class Insert(DML): 151 """Inserts rows into a table.""" 152 153 def run(self, c: Composition, state: State) -> None: 154 prev_max = self.table.watermarks.max 155 self.table.watermarks.max = prev_max + self.delta 156 c.testdrive( 157 f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});", 158 mz_service=state.mz_service, 159 ) 160 161 162class ShiftForward(DML): 163 """Update all rows from a table by incrementing their values by a constant.""" 164 165 def run(self, c: Composition, state: State) -> None: 166 self.table.watermarks.shift(self.delta) 167 c.testdrive( 168 f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};", 169 mz_service=state.mz_service, 170 ) 171 172 173class ShiftBackward(DML): 174 """Update all rows from a table by decrementing their values by a constant.""" 175 176 def run(self, c: Composition, state: State) -> None: 177 self.table.watermarks.shift(-self.delta) 178 c.testdrive( 179 f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};", 180 mz_service=state.mz_service, 181 ) 182 183 184class DeleteFromHead(DML): 185 """Delete the largest values from a table""" 186 187 def run(self, c: Composition, state: State) -> None: 188 self.table.watermarks.max = max( 189 self.table.watermarks.max - self.delta, self.table.watermarks.min 190 ) 191 c.testdrive( 192 f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};", 193 mz_service=state.mz_service, 194 ) 195 196 197class DeleteFromTail(DML): 198 """Delete the smallest values from a table""" 199 200 def run(self, c: Composition, state: State) -> None: 201 self.table.watermarks.min = min( 202 self.table.watermarks.min + self.delta, self.table.watermarks.max 203 ) 204 c.testdrive( 205 f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};", 206 mz_service=state.mz_service, 207 )
29class CreateTableParameterized(ActionFactory): 30 def __init__( 31 self, max_tables: int = 10, max_rows_per_action: int = MAX_ROWS_PER_ACTION 32 ) -> None: 33 self.max_tables = max_tables 34 self.max_rows_per_action = max_rows_per_action 35 36 @classmethod 37 def requires(cls) -> set[type[Capability]]: 38 return {BalancerdIsRunning, MzIsRunning} 39 40 def new(self, capabilities: Capabilities) -> list[Action]: 41 new_table_name = capabilities.get_free_capability_name( 42 TableExists, self.max_tables 43 ) 44 45 if new_table_name: 46 return [ 47 CreateTable( 48 capabilities=capabilities, 49 table=TableExists( 50 name=new_table_name, 51 has_index=random.choice([True, False]), 52 max_rows_per_action=self.max_rows_per_action, 53 ), 54 ) 55 ] 56 else: 57 return []
Base class for Action Factories that return parameterized Actions to execute.
36 @classmethod 37 def requires(cls) -> set[type[Capability]]: 38 return {BalancerdIsRunning, MzIsRunning}
Compute the capability classes that this Action Factory requires.
40 def new(self, capabilities: Capabilities) -> list[Action]: 41 new_table_name = capabilities.get_free_capability_name( 42 TableExists, self.max_tables 43 ) 44 45 if new_table_name: 46 return [ 47 CreateTable( 48 capabilities=capabilities, 49 table=TableExists( 50 name=new_table_name, 51 has_index=random.choice([True, False]), 52 max_rows_per_action=self.max_rows_per_action, 53 ), 54 ) 55 ] 56 else: 57 return []
60class CreateTable(Action): 61 """Creates a table on the Mz instance. 50% of the tables have a default index.""" 62 63 @classmethod 64 def requires(cls) -> set[type[Capability]]: 65 return {BalancerdIsRunning, MzIsRunning} 66 67 def __init__(self, table: TableExists, capabilities: Capabilities) -> None: 68 assert ( 69 table is not None 70 ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory" 71 self.table = table 72 super().__init__(capabilities) 73 74 def run(self, c: Composition, state: State) -> None: 75 index = ( 76 f"> CREATE DEFAULT INDEX ON {self.table.name}" 77 if self.table.has_index 78 else "" 79 ) 80 c.testdrive( 81 dedent(f""" 82 > CREATE TABLE {self.table.name} (f1 INTEGER); 83 {index} 84 > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max}); 85 """), 86 mz_service=state.mz_service, 87 ) 88 89 def provides(self) -> list[Capability]: 90 return [self.table]
Creates a table on the Mz instance. 50% of the tables have a default index.
67 def __init__(self, table: TableExists, capabilities: Capabilities) -> None: 68 assert ( 69 table is not None 70 ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory" 71 self.table = table 72 super().__init__(capabilities)
Construct a new action, possibly conditioning on the available capabilities.
63 @classmethod 64 def requires(cls) -> set[type[Capability]]: 65 return {BalancerdIsRunning, MzIsRunning}
Compute the capability classes that this action requires.
74 def run(self, c: Composition, state: State) -> None: 75 index = ( 76 f"> CREATE DEFAULT INDEX ON {self.table.name}" 77 if self.table.has_index 78 else "" 79 ) 80 c.testdrive( 81 dedent(f""" 82 > CREATE TABLE {self.table.name} (f1 INTEGER); 83 {index} 84 > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max}); 85 """), 86 mz_service=state.mz_service, 87 )
Run this action on the provided composition.
93class ValidateTable(Action): 94 """Validates that a single table contains data that is consistent with the expected min/max watermark.""" 95 96 @classmethod 97 def requires(cls) -> set[type[Capability]]: 98 return {BalancerdIsRunning, MzIsRunning, TableExists} 99 100 def __init__( 101 self, capabilities: Capabilities, table: TableExists | None = None 102 ) -> None: 103 if table is not None: 104 self.table = table 105 else: 106 self.table = random.choice(capabilities.get(TableExists)) 107 108 self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0] 109 super().__init__(capabilities) 110 111 def run(self, c: Composition, state: State) -> None: 112 # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table 113 # Therefore, only use it in 20% of validations. 114 if self.select_limit: 115 c.testdrive( 116 dedent(f""" 117 > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER); 118 > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999; 119 > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit; 120 {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} 121 > DROP TABLE {self.table.name}_select_limit 122 """), 123 mz_service=state.mz_service, 124 ) 125 else: 126 c.testdrive( 127 dedent(f""" 128 > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}; 129 {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} 130 """), 131 mz_service=state.mz_service, 132 )
Validates that a single table contains data that is consistent with the expected min/max watermark.
100 def __init__( 101 self, capabilities: Capabilities, table: TableExists | None = None 102 ) -> None: 103 if table is not None: 104 self.table = table 105 else: 106 self.table = random.choice(capabilities.get(TableExists)) 107 108 self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0] 109 super().__init__(capabilities)
Construct a new action, possibly conditioning on the available capabilities.
96 @classmethod 97 def requires(cls) -> set[type[Capability]]: 98 return {BalancerdIsRunning, MzIsRunning, TableExists}
Compute the capability classes that this action requires.
111 def run(self, c: Composition, state: State) -> None: 112 # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table 113 # Therefore, only use it in 20% of validations. 114 if self.select_limit: 115 c.testdrive( 116 dedent(f""" 117 > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER); 118 > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999; 119 > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit; 120 {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} 121 > DROP TABLE {self.table.name}_select_limit 122 """), 123 mz_service=state.mz_service, 124 ) 125 else: 126 c.testdrive( 127 dedent(f""" 128 > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}; 129 {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} 130 """), 131 mz_service=state.mz_service, 132 )
Run this action on the provided composition.
135class DML(Action): 136 """Performs an INSERT, DELETE or UPDATE against a table.""" 137 138 @classmethod 139 def requires(cls) -> set[type[Capability]]: 140 return {BalancerdIsRunning, MzIsRunning, TableExists} 141 142 def __init__(self, capabilities: Capabilities) -> None: 143 self.table = random.choice(capabilities.get(TableExists)) 144 self.delta = random.randint(1, self.table.max_rows_per_action) 145 super().__init__(capabilities) 146 147 def __str__(self) -> str: 148 return f"{Action.__str__(self)} {self.table.name}"
Performs an INSERT, DELETE or UPDATE against a table.
142 def __init__(self, capabilities: Capabilities) -> None: 143 self.table = random.choice(capabilities.get(TableExists)) 144 self.delta = random.randint(1, self.table.max_rows_per_action) 145 super().__init__(capabilities)
Construct a new action, possibly conditioning on the available capabilities.
151class Insert(DML): 152 """Inserts rows into a table.""" 153 154 def run(self, c: Composition, state: State) -> None: 155 prev_max = self.table.watermarks.max 156 self.table.watermarks.max = prev_max + self.delta 157 c.testdrive( 158 f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});", 159 mz_service=state.mz_service, 160 )
Inserts rows into a table.
154 def run(self, c: Composition, state: State) -> None: 155 prev_max = self.table.watermarks.max 156 self.table.watermarks.max = prev_max + self.delta 157 c.testdrive( 158 f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});", 159 mz_service=state.mz_service, 160 )
Run this action on the provided composition.
163class ShiftForward(DML): 164 """Update all rows from a table by incrementing their values by a constant.""" 165 166 def run(self, c: Composition, state: State) -> None: 167 self.table.watermarks.shift(self.delta) 168 c.testdrive( 169 f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};", 170 mz_service=state.mz_service, 171 )
Update all rows from a table by incrementing their values by a constant.
166 def run(self, c: Composition, state: State) -> None: 167 self.table.watermarks.shift(self.delta) 168 c.testdrive( 169 f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};", 170 mz_service=state.mz_service, 171 )
Run this action on the provided composition.
174class ShiftBackward(DML): 175 """Update all rows from a table by decrementing their values by a constant.""" 176 177 def run(self, c: Composition, state: State) -> None: 178 self.table.watermarks.shift(-self.delta) 179 c.testdrive( 180 f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};", 181 mz_service=state.mz_service, 182 )
Update all rows from a table by decrementing their values by a constant.
177 def run(self, c: Composition, state: State) -> None: 178 self.table.watermarks.shift(-self.delta) 179 c.testdrive( 180 f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};", 181 mz_service=state.mz_service, 182 )
Run this action on the provided composition.
185class DeleteFromHead(DML): 186 """Delete the largest values from a table""" 187 188 def run(self, c: Composition, state: State) -> None: 189 self.table.watermarks.max = max( 190 self.table.watermarks.max - self.delta, self.table.watermarks.min 191 ) 192 c.testdrive( 193 f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};", 194 mz_service=state.mz_service, 195 )
Delete the largest values from a table
188 def run(self, c: Composition, state: State) -> None: 189 self.table.watermarks.max = max( 190 self.table.watermarks.max - self.delta, self.table.watermarks.min 191 ) 192 c.testdrive( 193 f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};", 194 mz_service=state.mz_service, 195 )
Run this action on the provided composition.
198class DeleteFromTail(DML): 199 """Delete the smallest values from a table""" 200 201 def run(self, c: Composition, state: State) -> None: 202 self.table.watermarks.min = min( 203 self.table.watermarks.min + self.delta, self.table.watermarks.max 204 ) 205 c.testdrive( 206 f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};", 207 mz_service=state.mz_service, 208 )
Delete the smallest values from a table
201 def run(self, c: Composition, state: State) -> None: 202 self.table.watermarks.min = min( 203 self.table.watermarks.min + self.delta, self.table.watermarks.max 204 ) 205 c.testdrive( 206 f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};", 207 mz_service=state.mz_service, 208 )
Run this action on the provided composition.