misc.python.materialize.zippy.table_actions

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10import random
 11from textwrap import dedent
 12
 13from materialize.mzcompose.composition import Composition
 14from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
 15from materialize.zippy.framework import (
 16    Action,
 17    ActionFactory,
 18    Capabilities,
 19    Capability,
 20    State,
 21)
 22from materialize.zippy.mz_capabilities import MzIsRunning
 23from materialize.zippy.table_capabilities import TableExists
 24
 25MAX_ROWS_PER_ACTION = 10000
 26
 27
 28class CreateTableParameterized(ActionFactory):
 29    def __init__(
 30        self, max_tables: int = 10, max_rows_per_action: int = MAX_ROWS_PER_ACTION
 31    ) -> None:
 32        self.max_tables = max_tables
 33        self.max_rows_per_action = max_rows_per_action
 34
 35    @classmethod
 36    def requires(cls) -> set[type[Capability]]:
 37        return {BalancerdIsRunning, MzIsRunning}
 38
 39    def new(self, capabilities: Capabilities) -> list[Action]:
 40        new_table_name = capabilities.get_free_capability_name(
 41            TableExists, self.max_tables
 42        )
 43
 44        if new_table_name:
 45            return [
 46                CreateTable(
 47                    capabilities=capabilities,
 48                    table=TableExists(
 49                        name=new_table_name,
 50                        has_index=random.choice([True, False]),
 51                        max_rows_per_action=self.max_rows_per_action,
 52                    ),
 53                )
 54            ]
 55        else:
 56            return []
 57
 58
 59class CreateTable(Action):
 60    """Creates a table on the Mz instance. 50% of the tables have a default index."""
 61
 62    @classmethod
 63    def requires(cls) -> set[type[Capability]]:
 64        return {BalancerdIsRunning, MzIsRunning}
 65
 66    def __init__(self, table: TableExists, capabilities: Capabilities) -> None:
 67        assert (
 68            table is not None
 69        ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory"
 70        self.table = table
 71        super().__init__(capabilities)
 72
 73    def run(self, c: Composition, state: State) -> None:
 74        index = (
 75            f"> CREATE DEFAULT INDEX ON {self.table.name}"
 76            if self.table.has_index
 77            else ""
 78        )
 79        c.testdrive(
 80            dedent(
 81                f"""
 82                > CREATE TABLE {self.table.name} (f1 INTEGER);
 83                {index}
 84                > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max});
 85                """
 86            ),
 87            mz_service=state.mz_service,
 88        )
 89
 90    def provides(self) -> list[Capability]:
 91        return [self.table]
 92
 93
 94class ValidateTable(Action):
 95    """Validates that a single table contains data that is consistent with the expected min/max watermark."""
 96
 97    @classmethod
 98    def requires(cls) -> set[type[Capability]]:
 99        return {BalancerdIsRunning, MzIsRunning, TableExists}
100
101    def __init__(
102        self, capabilities: Capabilities, table: TableExists | None = None
103    ) -> None:
104        if table is not None:
105            self.table = table
106        else:
107            self.table = random.choice(capabilities.get(TableExists))
108
109        self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0]
110        super().__init__(capabilities)
111
112    def run(self, c: Composition, state: State) -> None:
113        # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table
114        # Therefore, only use it in 20% of validations.
115        if self.select_limit:
116            c.testdrive(
117                dedent(
118                    f"""
119                    > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER);
120                    > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999;
121                    > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit;
122                    {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
123                    > DROP TABLE {self.table.name}_select_limit
124                    """
125                ),
126                mz_service=state.mz_service,
127            )
128        else:
129            c.testdrive(
130                dedent(
131                    f"""
132                    > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name};
133                    {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
134                    """
135                ),
136                mz_service=state.mz_service,
137            )
138
139
140class DML(Action):
141    """Performs an INSERT, DELETE or UPDATE against a table."""
142
143    @classmethod
144    def requires(cls) -> set[type[Capability]]:
145        return {BalancerdIsRunning, MzIsRunning, TableExists}
146
147    def __init__(self, capabilities: Capabilities) -> None:
148        self.table = random.choice(capabilities.get(TableExists))
149        self.delta = random.randint(1, self.table.max_rows_per_action)
150        super().__init__(capabilities)
151
152    def __str__(self) -> str:
153        return f"{Action.__str__(self)} {self.table.name}"
154
155
156class Insert(DML):
157    """Inserts rows into a table."""
158
159    def run(self, c: Composition, state: State) -> None:
160        prev_max = self.table.watermarks.max
161        self.table.watermarks.max = prev_max + self.delta
162        c.testdrive(
163            f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});",
164            mz_service=state.mz_service,
165        )
166
167
168class ShiftForward(DML):
169    """Update all rows from a table by incrementing their values by a constant."""
170
171    def run(self, c: Composition, state: State) -> None:
172        self.table.watermarks.shift(self.delta)
173        c.testdrive(
174            f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};",
175            mz_service=state.mz_service,
176        )
177
178
179class ShiftBackward(DML):
180    """Update all rows from a table by decrementing their values by a constant."""
181
182    def run(self, c: Composition, state: State) -> None:
183        self.table.watermarks.shift(-self.delta)
184        c.testdrive(
185            f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};",
186            mz_service=state.mz_service,
187        )
188
189
190class DeleteFromHead(DML):
191    """Delete the largest values from a table"""
192
193    def run(self, c: Composition, state: State) -> None:
194        self.table.watermarks.max = max(
195            self.table.watermarks.max - self.delta, self.table.watermarks.min
196        )
197        c.testdrive(
198            f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};",
199            mz_service=state.mz_service,
200        )
201
202
203class DeleteFromTail(DML):
204    """Delete the smallest values from a table"""
205
206    def run(self, c: Composition, state: State) -> None:
207        self.table.watermarks.min = min(
208            self.table.watermarks.min + self.delta, self.table.watermarks.max
209        )
210        c.testdrive(
211            f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};",
212            mz_service=state.mz_service,
213        )
MAX_ROWS_PER_ACTION = 10000
class CreateTableParameterized(materialize.zippy.framework.ActionFactory):
29class CreateTableParameterized(ActionFactory):
30    def __init__(
31        self, max_tables: int = 10, max_rows_per_action: int = MAX_ROWS_PER_ACTION
32    ) -> None:
33        self.max_tables = max_tables
34        self.max_rows_per_action = max_rows_per_action
35
36    @classmethod
37    def requires(cls) -> set[type[Capability]]:
38        return {BalancerdIsRunning, MzIsRunning}
39
40    def new(self, capabilities: Capabilities) -> list[Action]:
41        new_table_name = capabilities.get_free_capability_name(
42            TableExists, self.max_tables
43        )
44
45        if new_table_name:
46            return [
47                CreateTable(
48                    capabilities=capabilities,
49                    table=TableExists(
50                        name=new_table_name,
51                        has_index=random.choice([True, False]),
52                        max_rows_per_action=self.max_rows_per_action,
53                    ),
54                )
55            ]
56        else:
57            return []

Base class for Action Factories that return parameterized Actions to execute.

CreateTableParameterized(max_tables: int = 10, max_rows_per_action: int = 10000)
30    def __init__(
31        self, max_tables: int = 10, max_rows_per_action: int = MAX_ROWS_PER_ACTION
32    ) -> None:
33        self.max_tables = max_tables
34        self.max_rows_per_action = max_rows_per_action
max_tables
max_rows_per_action
@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
36    @classmethod
37    def requires(cls) -> set[type[Capability]]:
38        return {BalancerdIsRunning, MzIsRunning}

Compute the capability classes that this Action Factory requires.

def new( self, capabilities: materialize.zippy.framework.Capabilities) -> list[materialize.zippy.framework.Action]:
40    def new(self, capabilities: Capabilities) -> list[Action]:
41        new_table_name = capabilities.get_free_capability_name(
42            TableExists, self.max_tables
43        )
44
45        if new_table_name:
46            return [
47                CreateTable(
48                    capabilities=capabilities,
49                    table=TableExists(
50                        name=new_table_name,
51                        has_index=random.choice([True, False]),
52                        max_rows_per_action=self.max_rows_per_action,
53                    ),
54                )
55            ]
56        else:
57            return []
class CreateTable(materialize.zippy.framework.Action):
60class CreateTable(Action):
61    """Creates a table on the Mz instance. 50% of the tables have a default index."""
62
63    @classmethod
64    def requires(cls) -> set[type[Capability]]:
65        return {BalancerdIsRunning, MzIsRunning}
66
67    def __init__(self, table: TableExists, capabilities: Capabilities) -> None:
68        assert (
69            table is not None
70        ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory"
71        self.table = table
72        super().__init__(capabilities)
73
74    def run(self, c: Composition, state: State) -> None:
75        index = (
76            f"> CREATE DEFAULT INDEX ON {self.table.name}"
77            if self.table.has_index
78            else ""
79        )
80        c.testdrive(
81            dedent(
82                f"""
83                > CREATE TABLE {self.table.name} (f1 INTEGER);
84                {index}
85                > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max});
86                """
87            ),
88            mz_service=state.mz_service,
89        )
90
91    def provides(self) -> list[Capability]:
92        return [self.table]

Creates a table on the Mz instance. 50% of the tables have a default index.

CreateTable( table: materialize.zippy.table_capabilities.TableExists, capabilities: materialize.zippy.framework.Capabilities)
67    def __init__(self, table: TableExists, capabilities: Capabilities) -> None:
68        assert (
69            table is not None
70        ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory"
71        self.table = table
72        super().__init__(capabilities)

Construct a new action, possibly conditioning on the available capabilities.

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
63    @classmethod
64    def requires(cls) -> set[type[Capability]]:
65        return {BalancerdIsRunning, MzIsRunning}

Compute the capability classes that this action requires.

table
def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
74    def run(self, c: Composition, state: State) -> None:
75        index = (
76            f"> CREATE DEFAULT INDEX ON {self.table.name}"
77            if self.table.has_index
78            else ""
79        )
80        c.testdrive(
81            dedent(
82                f"""
83                > CREATE TABLE {self.table.name} (f1 INTEGER);
84                {index}
85                > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max});
86                """
87            ),
88            mz_service=state.mz_service,
89        )

Run this action on the provided composition.

def provides(self) -> list[materialize.zippy.framework.Capability]:
91    def provides(self) -> list[Capability]:
92        return [self.table]

Compute the capabilities that this action will make available.

class ValidateTable(materialize.zippy.framework.Action):
 95class ValidateTable(Action):
 96    """Validates that a single table contains data that is consistent with the expected min/max watermark."""
 97
 98    @classmethod
 99    def requires(cls) -> set[type[Capability]]:
100        return {BalancerdIsRunning, MzIsRunning, TableExists}
101
102    def __init__(
103        self, capabilities: Capabilities, table: TableExists | None = None
104    ) -> None:
105        if table is not None:
106            self.table = table
107        else:
108            self.table = random.choice(capabilities.get(TableExists))
109
110        self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0]
111        super().__init__(capabilities)
112
113    def run(self, c: Composition, state: State) -> None:
114        # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table
115        # Therefore, only use it in 20% of validations.
116        if self.select_limit:
117            c.testdrive(
118                dedent(
119                    f"""
120                    > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER);
121                    > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999;
122                    > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit;
123                    {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
124                    > DROP TABLE {self.table.name}_select_limit
125                    """
126                ),
127                mz_service=state.mz_service,
128            )
129        else:
130            c.testdrive(
131                dedent(
132                    f"""
133                    > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name};
134                    {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
135                    """
136                ),
137                mz_service=state.mz_service,
138            )

Validates that a single table contains data that is consistent with the expected min/max watermark.

ValidateTable( capabilities: materialize.zippy.framework.Capabilities, table: materialize.zippy.table_capabilities.TableExists | None = None)
102    def __init__(
103        self, capabilities: Capabilities, table: TableExists | None = None
104    ) -> None:
105        if table is not None:
106            self.table = table
107        else:
108            self.table = random.choice(capabilities.get(TableExists))
109
110        self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0]
111        super().__init__(capabilities)

Construct a new action, possibly conditioning on the available capabilities.

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
 98    @classmethod
 99    def requires(cls) -> set[type[Capability]]:
100        return {BalancerdIsRunning, MzIsRunning, TableExists}

Compute the capability classes that this action requires.

select_limit
def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
113    def run(self, c: Composition, state: State) -> None:
114        # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table
115        # Therefore, only use it in 20% of validations.
116        if self.select_limit:
117            c.testdrive(
118                dedent(
119                    f"""
120                    > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER);
121                    > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999;
122                    > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit;
123                    {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
124                    > DROP TABLE {self.table.name}_select_limit
125                    """
126                ),
127                mz_service=state.mz_service,
128            )
129        else:
130            c.testdrive(
131                dedent(
132                    f"""
133                    > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name};
134                    {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
135                    """
136                ),
137                mz_service=state.mz_service,
138            )

Run this action on the provided composition.

class DML(materialize.zippy.framework.Action):
141class DML(Action):
142    """Performs an INSERT, DELETE or UPDATE against a table."""
143
144    @classmethod
145    def requires(cls) -> set[type[Capability]]:
146        return {BalancerdIsRunning, MzIsRunning, TableExists}
147
148    def __init__(self, capabilities: Capabilities) -> None:
149        self.table = random.choice(capabilities.get(TableExists))
150        self.delta = random.randint(1, self.table.max_rows_per_action)
151        super().__init__(capabilities)
152
153    def __str__(self) -> str:
154        return f"{Action.__str__(self)} {self.table.name}"

Performs an INSERT, DELETE or UPDATE against a table.

DML(capabilities: materialize.zippy.framework.Capabilities)
148    def __init__(self, capabilities: Capabilities) -> None:
149        self.table = random.choice(capabilities.get(TableExists))
150        self.delta = random.randint(1, self.table.max_rows_per_action)
151        super().__init__(capabilities)

Construct a new action, possibly conditioning on the available capabilities.

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
144    @classmethod
145    def requires(cls) -> set[type[Capability]]:
146        return {BalancerdIsRunning, MzIsRunning, TableExists}

Compute the capability classes that this action requires.

table
delta
class Insert(DML):
157class Insert(DML):
158    """Inserts rows into a table."""
159
160    def run(self, c: Composition, state: State) -> None:
161        prev_max = self.table.watermarks.max
162        self.table.watermarks.max = prev_max + self.delta
163        c.testdrive(
164            f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});",
165            mz_service=state.mz_service,
166        )

Inserts rows into a table.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
160    def run(self, c: Composition, state: State) -> None:
161        prev_max = self.table.watermarks.max
162        self.table.watermarks.max = prev_max + self.delta
163        c.testdrive(
164            f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});",
165            mz_service=state.mz_service,
166        )

Run this action on the provided composition.

Inherited Members
DML
DML
requires
table
delta
class ShiftForward(DML):
169class ShiftForward(DML):
170    """Update all rows from a table by incrementing their values by a constant."""
171
172    def run(self, c: Composition, state: State) -> None:
173        self.table.watermarks.shift(self.delta)
174        c.testdrive(
175            f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};",
176            mz_service=state.mz_service,
177        )

Update all rows from a table by incrementing their values by a constant.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
172    def run(self, c: Composition, state: State) -> None:
173        self.table.watermarks.shift(self.delta)
174        c.testdrive(
175            f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};",
176            mz_service=state.mz_service,
177        )

Run this action on the provided composition.

Inherited Members
DML
DML
requires
table
delta
class ShiftBackward(DML):
180class ShiftBackward(DML):
181    """Update all rows from a table by decrementing their values by a constant."""
182
183    def run(self, c: Composition, state: State) -> None:
184        self.table.watermarks.shift(-self.delta)
185        c.testdrive(
186            f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};",
187            mz_service=state.mz_service,
188        )

Update all rows from a table by decrementing their values by a constant.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
183    def run(self, c: Composition, state: State) -> None:
184        self.table.watermarks.shift(-self.delta)
185        c.testdrive(
186            f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};",
187            mz_service=state.mz_service,
188        )

Run this action on the provided composition.

Inherited Members
DML
DML
requires
table
delta
class DeleteFromHead(DML):
191class DeleteFromHead(DML):
192    """Delete the largest values from a table"""
193
194    def run(self, c: Composition, state: State) -> None:
195        self.table.watermarks.max = max(
196            self.table.watermarks.max - self.delta, self.table.watermarks.min
197        )
198        c.testdrive(
199            f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};",
200            mz_service=state.mz_service,
201        )

Delete the largest values from a table

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
194    def run(self, c: Composition, state: State) -> None:
195        self.table.watermarks.max = max(
196            self.table.watermarks.max - self.delta, self.table.watermarks.min
197        )
198        c.testdrive(
199            f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};",
200            mz_service=state.mz_service,
201        )

Run this action on the provided composition.

Inherited Members
DML
DML
requires
table
delta
class DeleteFromTail(DML):
204class DeleteFromTail(DML):
205    """Delete the smallest values from a table"""
206
207    def run(self, c: Composition, state: State) -> None:
208        self.table.watermarks.min = min(
209            self.table.watermarks.min + self.delta, self.table.watermarks.max
210        )
211        c.testdrive(
212            f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};",
213            mz_service=state.mz_service,
214        )

Delete the smallest values from a table

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
207    def run(self, c: Composition, state: State) -> None:
208        self.table.watermarks.min = min(
209            self.table.watermarks.min + self.delta, self.table.watermarks.max
210        )
211        c.testdrive(
212            f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};",
213            mz_service=state.mz_service,
214        )

Run this action on the provided composition.

Inherited Members
DML
DML
requires
table
delta