misc.python.materialize.zippy.table_actions

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10import random
 11from textwrap import dedent
 12
 13from materialize.mzcompose.composition import Composition
 14from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
 15from materialize.zippy.framework import (
 16    Action,
 17    ActionFactory,
 18    Capabilities,
 19    Capability,
 20    State,
 21)
 22from materialize.zippy.mz_capabilities import MzIsRunning
 23from materialize.zippy.table_capabilities import TableExists
 24
 25MAX_ROWS_PER_ACTION = 10000
 26
 27
 28class CreateTableParameterized(ActionFactory):
 29    def __init__(
 30        self, max_tables: int = 10, max_rows_per_action: int = MAX_ROWS_PER_ACTION
 31    ) -> None:
 32        self.max_tables = max_tables
 33        self.max_rows_per_action = max_rows_per_action
 34
 35    @classmethod
 36    def requires(cls) -> set[type[Capability]]:
 37        return {BalancerdIsRunning, MzIsRunning}
 38
 39    def new(self, capabilities: Capabilities) -> list[Action]:
 40        new_table_name = capabilities.get_free_capability_name(
 41            TableExists, self.max_tables
 42        )
 43
 44        if new_table_name:
 45            return [
 46                CreateTable(
 47                    capabilities=capabilities,
 48                    table=TableExists(
 49                        name=new_table_name,
 50                        has_index=random.choice([True, False]),
 51                        max_rows_per_action=self.max_rows_per_action,
 52                    ),
 53                )
 54            ]
 55        else:
 56            return []
 57
 58
 59class CreateTable(Action):
 60    """Creates a table on the Mz instance. 50% of the tables have a default index."""
 61
 62    @classmethod
 63    def requires(cls) -> set[type[Capability]]:
 64        return {BalancerdIsRunning, MzIsRunning}
 65
 66    def __init__(self, table: TableExists, capabilities: Capabilities) -> None:
 67        assert (
 68            table is not None
 69        ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory"
 70        self.table = table
 71        super().__init__(capabilities)
 72
 73    def run(self, c: Composition, state: State) -> None:
 74        index = (
 75            f"> CREATE DEFAULT INDEX ON {self.table.name}"
 76            if self.table.has_index
 77            else ""
 78        )
 79        c.testdrive(
 80            dedent(f"""
 81                > CREATE TABLE {self.table.name} (f1 INTEGER);
 82                {index}
 83                > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max});
 84                """),
 85            mz_service=state.mz_service,
 86        )
 87
 88    def provides(self) -> list[Capability]:
 89        return [self.table]
 90
 91
 92class ValidateTable(Action):
 93    """Validates that a single table contains data that is consistent with the expected min/max watermark."""
 94
 95    @classmethod
 96    def requires(cls) -> set[type[Capability]]:
 97        return {BalancerdIsRunning, MzIsRunning, TableExists}
 98
 99    def __init__(
100        self, capabilities: Capabilities, table: TableExists | None = None
101    ) -> None:
102        if table is not None:
103            self.table = table
104        else:
105            self.table = random.choice(capabilities.get(TableExists))
106
107        self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0]
108        super().__init__(capabilities)
109
110    def run(self, c: Composition, state: State) -> None:
111        # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table
112        # Therefore, only use it in 20% of validations.
113        if self.select_limit:
114            c.testdrive(
115                dedent(f"""
116                    > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER);
117                    > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999;
118                    > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit;
119                    {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
120                    > DROP TABLE {self.table.name}_select_limit
121                    """),
122                mz_service=state.mz_service,
123            )
124        else:
125            c.testdrive(
126                dedent(f"""
127                    > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name};
128                    {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
129                    """),
130                mz_service=state.mz_service,
131            )
132
133
134class DML(Action):
135    """Performs an INSERT, DELETE or UPDATE against a table."""
136
137    @classmethod
138    def requires(cls) -> set[type[Capability]]:
139        return {BalancerdIsRunning, MzIsRunning, TableExists}
140
141    def __init__(self, capabilities: Capabilities) -> None:
142        self.table = random.choice(capabilities.get(TableExists))
143        self.delta = random.randint(1, self.table.max_rows_per_action)
144        super().__init__(capabilities)
145
146    def __str__(self) -> str:
147        return f"{Action.__str__(self)} {self.table.name}"
148
149
150class Insert(DML):
151    """Inserts rows into a table."""
152
153    def run(self, c: Composition, state: State) -> None:
154        prev_max = self.table.watermarks.max
155        self.table.watermarks.max = prev_max + self.delta
156        c.testdrive(
157            f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});",
158            mz_service=state.mz_service,
159        )
160
161
162class ShiftForward(DML):
163    """Update all rows from a table by incrementing their values by a constant."""
164
165    def run(self, c: Composition, state: State) -> None:
166        self.table.watermarks.shift(self.delta)
167        c.testdrive(
168            f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};",
169            mz_service=state.mz_service,
170        )
171
172
173class ShiftBackward(DML):
174    """Update all rows from a table by decrementing their values by a constant."""
175
176    def run(self, c: Composition, state: State) -> None:
177        self.table.watermarks.shift(-self.delta)
178        c.testdrive(
179            f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};",
180            mz_service=state.mz_service,
181        )
182
183
184class DeleteFromHead(DML):
185    """Delete the largest values from a table"""
186
187    def run(self, c: Composition, state: State) -> None:
188        self.table.watermarks.max = max(
189            self.table.watermarks.max - self.delta, self.table.watermarks.min
190        )
191        c.testdrive(
192            f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};",
193            mz_service=state.mz_service,
194        )
195
196
197class DeleteFromTail(DML):
198    """Delete the smallest values from a table"""
199
200    def run(self, c: Composition, state: State) -> None:
201        self.table.watermarks.min = min(
202            self.table.watermarks.min + self.delta, self.table.watermarks.max
203        )
204        c.testdrive(
205            f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};",
206            mz_service=state.mz_service,
207        )
MAX_ROWS_PER_ACTION = 10000
class CreateTableParameterized(materialize.zippy.framework.ActionFactory):
29class CreateTableParameterized(ActionFactory):
30    def __init__(
31        self, max_tables: int = 10, max_rows_per_action: int = MAX_ROWS_PER_ACTION
32    ) -> None:
33        self.max_tables = max_tables
34        self.max_rows_per_action = max_rows_per_action
35
36    @classmethod
37    def requires(cls) -> set[type[Capability]]:
38        return {BalancerdIsRunning, MzIsRunning}
39
40    def new(self, capabilities: Capabilities) -> list[Action]:
41        new_table_name = capabilities.get_free_capability_name(
42            TableExists, self.max_tables
43        )
44
45        if new_table_name:
46            return [
47                CreateTable(
48                    capabilities=capabilities,
49                    table=TableExists(
50                        name=new_table_name,
51                        has_index=random.choice([True, False]),
52                        max_rows_per_action=self.max_rows_per_action,
53                    ),
54                )
55            ]
56        else:
57            return []

Base class for Action Factories that return parameterized Actions to execute.

CreateTableParameterized(max_tables: int = 10, max_rows_per_action: int = 10000)
30    def __init__(
31        self, max_tables: int = 10, max_rows_per_action: int = MAX_ROWS_PER_ACTION
32    ) -> None:
33        self.max_tables = max_tables
34        self.max_rows_per_action = max_rows_per_action
max_tables
max_rows_per_action
@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
36    @classmethod
37    def requires(cls) -> set[type[Capability]]:
38        return {BalancerdIsRunning, MzIsRunning}

Compute the capability classes that this Action Factory requires.

def new( self, capabilities: materialize.zippy.framework.Capabilities) -> list[materialize.zippy.framework.Action]:
40    def new(self, capabilities: Capabilities) -> list[Action]:
41        new_table_name = capabilities.get_free_capability_name(
42            TableExists, self.max_tables
43        )
44
45        if new_table_name:
46            return [
47                CreateTable(
48                    capabilities=capabilities,
49                    table=TableExists(
50                        name=new_table_name,
51                        has_index=random.choice([True, False]),
52                        max_rows_per_action=self.max_rows_per_action,
53                    ),
54                )
55            ]
56        else:
57            return []
class CreateTable(materialize.zippy.framework.Action):
60class CreateTable(Action):
61    """Creates a table on the Mz instance. 50% of the tables have a default index."""
62
63    @classmethod
64    def requires(cls) -> set[type[Capability]]:
65        return {BalancerdIsRunning, MzIsRunning}
66
67    def __init__(self, table: TableExists, capabilities: Capabilities) -> None:
68        assert (
69            table is not None
70        ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory"
71        self.table = table
72        super().__init__(capabilities)
73
74    def run(self, c: Composition, state: State) -> None:
75        index = (
76            f"> CREATE DEFAULT INDEX ON {self.table.name}"
77            if self.table.has_index
78            else ""
79        )
80        c.testdrive(
81            dedent(f"""
82                > CREATE TABLE {self.table.name} (f1 INTEGER);
83                {index}
84                > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max});
85                """),
86            mz_service=state.mz_service,
87        )
88
89    def provides(self) -> list[Capability]:
90        return [self.table]

Creates a table on the Mz instance. 50% of the tables have a default index.

CreateTable( table: materialize.zippy.table_capabilities.TableExists, capabilities: materialize.zippy.framework.Capabilities)
67    def __init__(self, table: TableExists, capabilities: Capabilities) -> None:
68        assert (
69            table is not None
70        ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory"
71        self.table = table
72        super().__init__(capabilities)

Construct a new action, possibly conditioning on the available capabilities.

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
63    @classmethod
64    def requires(cls) -> set[type[Capability]]:
65        return {BalancerdIsRunning, MzIsRunning}

Compute the capability classes that this action requires.

table
def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
74    def run(self, c: Composition, state: State) -> None:
75        index = (
76            f"> CREATE DEFAULT INDEX ON {self.table.name}"
77            if self.table.has_index
78            else ""
79        )
80        c.testdrive(
81            dedent(f"""
82                > CREATE TABLE {self.table.name} (f1 INTEGER);
83                {index}
84                > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max});
85                """),
86            mz_service=state.mz_service,
87        )

Run this action on the provided composition.

def provides(self) -> list[materialize.zippy.framework.Capability]:
89    def provides(self) -> list[Capability]:
90        return [self.table]

Compute the capabilities that this action will make available.

class ValidateTable(materialize.zippy.framework.Action):
 93class ValidateTable(Action):
 94    """Validates that a single table contains data that is consistent with the expected min/max watermark."""
 95
 96    @classmethod
 97    def requires(cls) -> set[type[Capability]]:
 98        return {BalancerdIsRunning, MzIsRunning, TableExists}
 99
100    def __init__(
101        self, capabilities: Capabilities, table: TableExists | None = None
102    ) -> None:
103        if table is not None:
104            self.table = table
105        else:
106            self.table = random.choice(capabilities.get(TableExists))
107
108        self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0]
109        super().__init__(capabilities)
110
111    def run(self, c: Composition, state: State) -> None:
112        # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table
113        # Therefore, only use it in 20% of validations.
114        if self.select_limit:
115            c.testdrive(
116                dedent(f"""
117                    > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER);
118                    > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999;
119                    > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit;
120                    {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
121                    > DROP TABLE {self.table.name}_select_limit
122                    """),
123                mz_service=state.mz_service,
124            )
125        else:
126            c.testdrive(
127                dedent(f"""
128                    > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name};
129                    {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
130                    """),
131                mz_service=state.mz_service,
132            )

Validates that a single table contains data that is consistent with the expected min/max watermark.

ValidateTable( capabilities: materialize.zippy.framework.Capabilities, table: materialize.zippy.table_capabilities.TableExists | None = None)
100    def __init__(
101        self, capabilities: Capabilities, table: TableExists | None = None
102    ) -> None:
103        if table is not None:
104            self.table = table
105        else:
106            self.table = random.choice(capabilities.get(TableExists))
107
108        self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0]
109        super().__init__(capabilities)

Construct a new action, possibly conditioning on the available capabilities.

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
96    @classmethod
97    def requires(cls) -> set[type[Capability]]:
98        return {BalancerdIsRunning, MzIsRunning, TableExists}

Compute the capability classes that this action requires.

select_limit
def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
111    def run(self, c: Composition, state: State) -> None:
112        # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table
113        # Therefore, only use it in 20% of validations.
114        if self.select_limit:
115            c.testdrive(
116                dedent(f"""
117                    > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER);
118                    > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999;
119                    > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit;
120                    {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
121                    > DROP TABLE {self.table.name}_select_limit
122                    """),
123                mz_service=state.mz_service,
124            )
125        else:
126            c.testdrive(
127                dedent(f"""
128                    > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name};
129                    {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
130                    """),
131                mz_service=state.mz_service,
132            )

Run this action on the provided composition.

class DML(materialize.zippy.framework.Action):
135class DML(Action):
136    """Performs an INSERT, DELETE or UPDATE against a table."""
137
138    @classmethod
139    def requires(cls) -> set[type[Capability]]:
140        return {BalancerdIsRunning, MzIsRunning, TableExists}
141
142    def __init__(self, capabilities: Capabilities) -> None:
143        self.table = random.choice(capabilities.get(TableExists))
144        self.delta = random.randint(1, self.table.max_rows_per_action)
145        super().__init__(capabilities)
146
147    def __str__(self) -> str:
148        return f"{Action.__str__(self)} {self.table.name}"

Performs an INSERT, DELETE or UPDATE against a table.

DML(capabilities: materialize.zippy.framework.Capabilities)
142    def __init__(self, capabilities: Capabilities) -> None:
143        self.table = random.choice(capabilities.get(TableExists))
144        self.delta = random.randint(1, self.table.max_rows_per_action)
145        super().__init__(capabilities)

Construct a new action, possibly conditioning on the available capabilities.

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
138    @classmethod
139    def requires(cls) -> set[type[Capability]]:
140        return {BalancerdIsRunning, MzIsRunning, TableExists}

Compute the capability classes that this action requires.

table
delta
class Insert(DML):
151class Insert(DML):
152    """Inserts rows into a table."""
153
154    def run(self, c: Composition, state: State) -> None:
155        prev_max = self.table.watermarks.max
156        self.table.watermarks.max = prev_max + self.delta
157        c.testdrive(
158            f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});",
159            mz_service=state.mz_service,
160        )

Inserts rows into a table.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
154    def run(self, c: Composition, state: State) -> None:
155        prev_max = self.table.watermarks.max
156        self.table.watermarks.max = prev_max + self.delta
157        c.testdrive(
158            f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});",
159            mz_service=state.mz_service,
160        )

Run this action on the provided composition.

Inherited Members
DML
DML
requires
table
delta
class ShiftForward(DML):
163class ShiftForward(DML):
164    """Update all rows from a table by incrementing their values by a constant."""
165
166    def run(self, c: Composition, state: State) -> None:
167        self.table.watermarks.shift(self.delta)
168        c.testdrive(
169            f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};",
170            mz_service=state.mz_service,
171        )

Update all rows from a table by incrementing their values by a constant.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
166    def run(self, c: Composition, state: State) -> None:
167        self.table.watermarks.shift(self.delta)
168        c.testdrive(
169            f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};",
170            mz_service=state.mz_service,
171        )

Run this action on the provided composition.

Inherited Members
DML
DML
requires
table
delta
class ShiftBackward(DML):
174class ShiftBackward(DML):
175    """Update all rows from a table by decrementing their values by a constant."""
176
177    def run(self, c: Composition, state: State) -> None:
178        self.table.watermarks.shift(-self.delta)
179        c.testdrive(
180            f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};",
181            mz_service=state.mz_service,
182        )

Update all rows from a table by decrementing their values by a constant.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
177    def run(self, c: Composition, state: State) -> None:
178        self.table.watermarks.shift(-self.delta)
179        c.testdrive(
180            f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};",
181            mz_service=state.mz_service,
182        )

Run this action on the provided composition.

Inherited Members
DML
DML
requires
table
delta
class DeleteFromHead(DML):
185class DeleteFromHead(DML):
186    """Delete the largest values from a table"""
187
188    def run(self, c: Composition, state: State) -> None:
189        self.table.watermarks.max = max(
190            self.table.watermarks.max - self.delta, self.table.watermarks.min
191        )
192        c.testdrive(
193            f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};",
194            mz_service=state.mz_service,
195        )

Delete the largest values from a table

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
188    def run(self, c: Composition, state: State) -> None:
189        self.table.watermarks.max = max(
190            self.table.watermarks.max - self.delta, self.table.watermarks.min
191        )
192        c.testdrive(
193            f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};",
194            mz_service=state.mz_service,
195        )

Run this action on the provided composition.

Inherited Members
DML
DML
requires
table
delta
class DeleteFromTail(DML):
198class DeleteFromTail(DML):
199    """Delete the smallest values from a table"""
200
201    def run(self, c: Composition, state: State) -> None:
202        self.table.watermarks.min = min(
203            self.table.watermarks.min + self.delta, self.table.watermarks.max
204        )
205        c.testdrive(
206            f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};",
207            mz_service=state.mz_service,
208        )

Delete the smallest values from a table

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
201    def run(self, c: Composition, state: State) -> None:
202        self.table.watermarks.min = min(
203            self.table.watermarks.min + self.delta, self.table.watermarks.max
204        )
205        c.testdrive(
206            f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};",
207            mz_service=state.mz_service,
208        )

Run this action on the provided composition.

Inherited Members
DML
DML
requires
table
delta