Module materialize.zippy.table_actions
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
from textwrap import dedent
from materialize.mzcompose.composition import Composition
from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
from materialize.zippy.framework import Action, ActionFactory, Capabilities, Capability
from materialize.zippy.mz_capabilities import MzIsRunning
from materialize.zippy.table_capabilities import TableExists
MAX_ROWS_PER_ACTION = 10000
class CreateTableParameterized(ActionFactory):
def __init__(
self, max_tables: int = 10, max_rows_per_action: int = MAX_ROWS_PER_ACTION
) -> None:
self.max_tables = max_tables
self.max_rows_per_action = max_rows_per_action
@classmethod
def requires(cls) -> set[type[Capability]]:
return {BalancerdIsRunning, MzIsRunning}
def new(self, capabilities: Capabilities) -> list[Action]:
new_table_name = capabilities.get_free_capability_name(
TableExists, self.max_tables
)
if new_table_name:
return [
CreateTable(
capabilities=capabilities,
table=TableExists(
name=new_table_name,
has_index=random.choice([True, False]),
max_rows_per_action=self.max_rows_per_action,
),
)
]
else:
return []
class CreateTable(Action):
"""Creates a table on the Mz instance. 50% of the tables have a default index."""
@classmethod
def requires(cls) -> set[type[Capability]]:
return {BalancerdIsRunning, MzIsRunning}
def __init__(self, table: TableExists, capabilities: Capabilities) -> None:
assert (
table is not None
), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory"
self.table = table
super().__init__(capabilities)
def run(self, c: Composition) -> None:
index = (
f"> CREATE DEFAULT INDEX ON {self.table.name}"
if self.table.has_index
else ""
)
c.testdrive(
dedent(
f"""
> CREATE TABLE {self.table.name} (f1 INTEGER);
{index}
> INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max});
"""
)
)
def provides(self) -> list[Capability]:
return [self.table]
class ValidateTable(Action):
"""Validates that a single table contains data that is consistent with the expected min/max watermark."""
@classmethod
def requires(cls) -> set[type[Capability]]:
return {BalancerdIsRunning, MzIsRunning, TableExists}
def __init__(
self, capabilities: Capabilities, table: TableExists | None = None
) -> None:
if table is not None:
self.table = table
else:
self.table = random.choice(capabilities.get(TableExists))
self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0]
super().__init__(capabilities)
def run(self, c: Composition) -> None:
# Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table
# Therefore, only use it in 20% of validations.
if self.select_limit:
c.testdrive(
dedent(
f"""
> CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER);
> INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999;
> SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit;
{self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
> DROP TABLE {self.table.name}_select_limit
"""
)
)
else:
c.testdrive(
dedent(
f"""
> SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name};
{self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
"""
)
)
class DML(Action):
"""Performs an INSERT, DELETE or UPDATE against a table."""
@classmethod
def requires(cls) -> set[type[Capability]]:
return {BalancerdIsRunning, MzIsRunning, TableExists}
def __init__(self, capabilities: Capabilities) -> None:
self.table = random.choice(capabilities.get(TableExists))
self.delta = random.randint(1, self.table.max_rows_per_action)
super().__init__(capabilities)
def __str__(self) -> str:
return f"{Action.__str__(self)} {self.table.name}"
class Insert(DML):
"""Inserts rows into a table."""
def run(self, c: Composition) -> None:
prev_max = self.table.watermarks.max
self.table.watermarks.max = prev_max + self.delta
c.testdrive(
f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});"
)
class ShiftForward(DML):
"""Update all rows from a table by incrementing their values by a constant."""
def run(self, c: Composition) -> None:
self.table.watermarks.shift(self.delta)
c.testdrive(f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};")
class ShiftBackward(DML):
"""Update all rows from a table by decrementing their values by a constant."""
def run(self, c: Composition) -> None:
self.table.watermarks.shift(-self.delta)
c.testdrive(f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};")
class DeleteFromHead(DML):
"""Delete the largest values from a table"""
def run(self, c: Composition) -> None:
self.table.watermarks.max = max(
self.table.watermarks.max - self.delta, self.table.watermarks.min
)
c.testdrive(
f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};"
)
class DeleteFromTail(DML):
"""Delete the smallest values from a table"""
def run(self, c: Composition) -> None:
self.table.watermarks.min = min(
self.table.watermarks.min + self.delta, self.table.watermarks.max
)
c.testdrive(
f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};"
)
Classes
class CreateTable (table: TableExists, capabilities: Capabilities)
-
Creates a table on the Mz instance. 50% of the tables have a default index.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class CreateTable(Action): """Creates a table on the Mz instance. 50% of the tables have a default index.""" @classmethod def requires(cls) -> set[type[Capability]]: return {BalancerdIsRunning, MzIsRunning} def __init__(self, table: TableExists, capabilities: Capabilities) -> None: assert ( table is not None ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory" self.table = table super().__init__(capabilities) def run(self, c: Composition) -> None: index = ( f"> CREATE DEFAULT INDEX ON {self.table.name}" if self.table.has_index else "" ) c.testdrive( dedent( f""" > CREATE TABLE {self.table.name} (f1 INTEGER); {index} > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max}); """ ) ) def provides(self) -> list[Capability]: return [self.table]
Ancestors
Inherited members
class CreateTableParameterized (max_tables: int = 10, max_rows_per_action: int = 10000)
-
Base class for Action Factories that return parameterized Actions to execute.
Expand source code Browse git
class CreateTableParameterized(ActionFactory): def __init__( self, max_tables: int = 10, max_rows_per_action: int = MAX_ROWS_PER_ACTION ) -> None: self.max_tables = max_tables self.max_rows_per_action = max_rows_per_action @classmethod def requires(cls) -> set[type[Capability]]: return {BalancerdIsRunning, MzIsRunning} def new(self, capabilities: Capabilities) -> list[Action]: new_table_name = capabilities.get_free_capability_name( TableExists, self.max_tables ) if new_table_name: return [ CreateTable( capabilities=capabilities, table=TableExists( name=new_table_name, has_index=random.choice([True, False]), max_rows_per_action=self.max_rows_per_action, ), ) ] else: return []
Ancestors
Methods
def new(self, capabilities: Capabilities) ‑> list[Action]
-
Expand source code Browse git
def new(self, capabilities: Capabilities) -> list[Action]: new_table_name = capabilities.get_free_capability_name( TableExists, self.max_tables ) if new_table_name: return [ CreateTable( capabilities=capabilities, table=TableExists( name=new_table_name, has_index=random.choice([True, False]), max_rows_per_action=self.max_rows_per_action, ), ) ] else: return []
Inherited members
class DML (capabilities: Capabilities)
-
Performs an INSERT, DELETE or UPDATE against a table.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class DML(Action): """Performs an INSERT, DELETE or UPDATE against a table.""" @classmethod def requires(cls) -> set[type[Capability]]: return {BalancerdIsRunning, MzIsRunning, TableExists} def __init__(self, capabilities: Capabilities) -> None: self.table = random.choice(capabilities.get(TableExists)) self.delta = random.randint(1, self.table.max_rows_per_action) super().__init__(capabilities) def __str__(self) -> str: return f"{Action.__str__(self)} {self.table.name}"
Ancestors
Subclasses
Inherited members
class DeleteFromHead (capabilities: Capabilities)
-
Delete the largest values from a table
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class DeleteFromHead(DML): """Delete the largest values from a table""" def run(self, c: Composition) -> None: self.table.watermarks.max = max( self.table.watermarks.max - self.delta, self.table.watermarks.min ) c.testdrive( f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};" )
Ancestors
Inherited members
class DeleteFromTail (capabilities: Capabilities)
-
Delete the smallest values from a table
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class DeleteFromTail(DML): """Delete the smallest values from a table""" def run(self, c: Composition) -> None: self.table.watermarks.min = min( self.table.watermarks.min + self.delta, self.table.watermarks.max ) c.testdrive( f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};" )
Ancestors
Inherited members
class Insert (capabilities: Capabilities)
-
Inserts rows into a table.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class Insert(DML): """Inserts rows into a table.""" def run(self, c: Composition) -> None: prev_max = self.table.watermarks.max self.table.watermarks.max = prev_max + self.delta c.testdrive( f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});" )
Ancestors
Inherited members
class ShiftBackward (capabilities: Capabilities)
-
Update all rows from a table by decrementing their values by a constant.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class ShiftBackward(DML): """Update all rows from a table by decrementing their values by a constant.""" def run(self, c: Composition) -> None: self.table.watermarks.shift(-self.delta) c.testdrive(f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};")
Ancestors
Inherited members
class ShiftForward (capabilities: Capabilities)
-
Update all rows from a table by incrementing their values by a constant.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class ShiftForward(DML): """Update all rows from a table by incrementing their values by a constant.""" def run(self, c: Composition) -> None: self.table.watermarks.shift(self.delta) c.testdrive(f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};")
Ancestors
Inherited members
class ValidateTable (capabilities: Capabilities, table: TableExists | None = None)
-
Validates that a single table contains data that is consistent with the expected min/max watermark.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class ValidateTable(Action): """Validates that a single table contains data that is consistent with the expected min/max watermark.""" @classmethod def requires(cls) -> set[type[Capability]]: return {BalancerdIsRunning, MzIsRunning, TableExists} def __init__( self, capabilities: Capabilities, table: TableExists | None = None ) -> None: if table is not None: self.table = table else: self.table = random.choice(capabilities.get(TableExists)) self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0] super().__init__(capabilities) def run(self, c: Composition) -> None: # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table # Therefore, only use it in 20% of validations. if self.select_limit: c.testdrive( dedent( f""" > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER); > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999; > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit; {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} > DROP TABLE {self.table.name}_select_limit """ ) ) else: c.testdrive( dedent( f""" > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}; {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1} """ ) )
Ancestors
Inherited members