Module materialize.zippy.mysql_actions
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
from textwrap import dedent
from materialize.mzcompose.composition import Composition
from materialize.mzcompose.services.mysql import MySql
from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
from materialize.zippy.framework import Action, Capabilities, Capability
from materialize.zippy.mysql_capabilities import MySqlRunning, MySqlTableExists
from materialize.zippy.mz_capabilities import MzIsRunning
class MySqlStart(Action):
"""Start a MySQL instance."""
def provides(self) -> list[Capability]:
return [MySqlRunning()]
def run(self, c: Composition) -> None:
c.up("mysql")
c.testdrive(
dedent(
f"""
$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
"""
)
)
class MySqlStop(Action):
"""Stop the MySQL instance."""
@classmethod
def requires(cls) -> set[type[Capability]]:
return {MySqlRunning}
def withholds(self) -> set[type[Capability]]:
return {MySqlRunning}
def run(self, c: Composition) -> None:
c.kill("mysql")
class MySqlRestart(Action):
"""Restart the MySql instance."""
def run(self, c: Composition) -> None:
c.kill("mysql")
c.up("mysql")
class CreateMySqlTable(Action):
"""Creates a table on the MySql instance. 50% of the tables have a PK."""
@classmethod
def requires(cls) -> set[type[Capability]]:
return {BalancerdIsRunning, MzIsRunning, MySqlRunning}
def __init__(self, capabilities: Capabilities) -> None:
this_mysql_table = MySqlTableExists(name="table" + str(random.randint(1, 10)))
existing_mysql_tables = [
t
for t in capabilities.get(MySqlTableExists)
if t.name == this_mysql_table.name
]
if len(existing_mysql_tables) == 0:
self.new_mysql_table = True
# A PK is now required for Debezium
this_mysql_table.has_pk = True
self.mysql_table = this_mysql_table
elif len(existing_mysql_tables) == 1:
self.new_mysql_table = False
self.mysql_table = existing_mysql_tables[0]
else:
assert False
super().__init__(capabilities)
def run(self, c: Composition) -> None:
if self.new_mysql_table:
primary_key = "PRIMARY KEY" if self.mysql_table.has_pk else ""
c.testdrive(
dedent(
f"""
$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
$ mysql-execute name=mysql
USE public;
CREATE TABLE {self.mysql_table.name} (f1 INTEGER {primary_key});
INSERT INTO {self.mysql_table.name} VALUES ({self.mysql_table.watermarks.max});
"""
)
)
def provides(self) -> list[Capability]:
return [self.mysql_table] if self.new_mysql_table else []
class MySqlDML(Action):
"""Performs an INSERT, DELETE or UPDATE against a MySQL table."""
# We use smaller batches in Pg then in Mz because Pg will fill up much faster
MAX_BATCH_SIZE = 10000
@classmethod
def requires(cls) -> set[type[Capability]]:
return {BalancerdIsRunning, MzIsRunning, MySqlRunning, MySqlTableExists}
def __init__(self, capabilities: Capabilities) -> None:
self.mysql_table = random.choice(capabilities.get(MySqlTableExists))
self.delta = random.randint(1, MySqlDML.MAX_BATCH_SIZE)
super().__init__(capabilities)
def __str__(self) -> str:
return f"{Action.__str__(self)} {self.mysql_table.name}"
class MySqlInsert(MySqlDML):
"""Inserts rows into a MySQL table."""
def run(self, c: Composition) -> None:
prev_max = self.mysql_table.watermarks.max
self.mysql_table.watermarks.max = prev_max + self.delta
c.testdrive(
dedent(
f"""
$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
$ mysql-execute name=mysql
USE public;
SET @i:={prev_max};
INSERT INTO {self.mysql_table.name} SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.mysql_table.watermarks.max - prev_max};
"""
)
)
class MySqlShiftForward(MySqlDML):
"""Update all rows from a MySQL table by incrementing their values by a constant (tables without a PK only)"""
def run(self, c: Composition) -> None:
if not self.mysql_table.has_pk:
self.mysql_table.watermarks.shift(self.delta)
c.testdrive(
dedent(
f"""
$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
$ mysql-execute name=mysql
USE public;
UPDATE {self.mysql_table.name} SET f1 = f1 + {self.delta};
"""
)
)
class MySqlShiftBackward(MySqlDML):
"""Update all rows from a MySQL table by decrementing their values by a constant (tables without a PK only)"""
def run(self, c: Composition) -> None:
if not self.mysql_table.has_pk:
self.mysql_table.watermarks.shift(-self.delta)
c.testdrive(
dedent(
f"""
$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
$ mysql-execute name=mysql
USE public;
UPDATE {self.mysql_table.name} SET f1 = f1 - {self.delta};
"""
)
)
class MySqlDeleteFromHead(MySqlDML):
"""Delete the largest values from a MySQL table"""
def run(self, c: Composition) -> None:
self.mysql_table.watermarks.max = max(
self.mysql_table.watermarks.max - self.delta,
self.mysql_table.watermarks.min,
)
c.testdrive(
dedent(
f"""
$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
$ mysql-execute name=mysql
USE public;
DELETE FROM {self.mysql_table.name} WHERE f1 > {self.mysql_table.watermarks.max};
"""
)
)
class MySqlDeleteFromTail(MySqlDML):
"""Delete the smallest values from a MySQL table"""
def run(self, c: Composition) -> None:
self.mysql_table.watermarks.min = min(
self.mysql_table.watermarks.min + self.delta,
self.mysql_table.watermarks.max,
)
c.testdrive(
dedent(
f"""
$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
$ mysql-execute name=mysql
USE public;
DELETE FROM {self.mysql_table.name} WHERE f1 < {self.mysql_table.watermarks.min};
"""
)
)
Classes
class CreateMySqlTable (capabilities: Capabilities)
-
Creates a table on the MySql instance. 50% of the tables have a PK.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class CreateMySqlTable(Action): """Creates a table on the MySql instance. 50% of the tables have a PK.""" @classmethod def requires(cls) -> set[type[Capability]]: return {BalancerdIsRunning, MzIsRunning, MySqlRunning} def __init__(self, capabilities: Capabilities) -> None: this_mysql_table = MySqlTableExists(name="table" + str(random.randint(1, 10))) existing_mysql_tables = [ t for t in capabilities.get(MySqlTableExists) if t.name == this_mysql_table.name ] if len(existing_mysql_tables) == 0: self.new_mysql_table = True # A PK is now required for Debezium this_mysql_table.has_pk = True self.mysql_table = this_mysql_table elif len(existing_mysql_tables) == 1: self.new_mysql_table = False self.mysql_table = existing_mysql_tables[0] else: assert False super().__init__(capabilities) def run(self, c: Composition) -> None: if self.new_mysql_table: primary_key = "PRIMARY KEY" if self.mysql_table.has_pk else "" c.testdrive( dedent( f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; CREATE TABLE {self.mysql_table.name} (f1 INTEGER {primary_key}); INSERT INTO {self.mysql_table.name} VALUES ({self.mysql_table.watermarks.max}); """ ) ) def provides(self) -> list[Capability]: return [self.mysql_table] if self.new_mysql_table else []
Ancestors
Inherited members
class MySqlDML (capabilities: Capabilities)
-
Performs an INSERT, DELETE or UPDATE against a MySQL table.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class MySqlDML(Action): """Performs an INSERT, DELETE or UPDATE against a MySQL table.""" # We use smaller batches in Pg then in Mz because Pg will fill up much faster MAX_BATCH_SIZE = 10000 @classmethod def requires(cls) -> set[type[Capability]]: return {BalancerdIsRunning, MzIsRunning, MySqlRunning, MySqlTableExists} def __init__(self, capabilities: Capabilities) -> None: self.mysql_table = random.choice(capabilities.get(MySqlTableExists)) self.delta = random.randint(1, MySqlDML.MAX_BATCH_SIZE) super().__init__(capabilities) def __str__(self) -> str: return f"{Action.__str__(self)} {self.mysql_table.name}"
Ancestors
Subclasses
Class variables
var MAX_BATCH_SIZE
Inherited members
class MySqlDeleteFromHead (capabilities: Capabilities)
-
Delete the largest values from a MySQL table
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class MySqlDeleteFromHead(MySqlDML): """Delete the largest values from a MySQL table""" def run(self, c: Composition) -> None: self.mysql_table.watermarks.max = max( self.mysql_table.watermarks.max - self.delta, self.mysql_table.watermarks.min, ) c.testdrive( dedent( f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; DELETE FROM {self.mysql_table.name} WHERE f1 > {self.mysql_table.watermarks.max}; """ ) )
Ancestors
Inherited members
class MySqlDeleteFromTail (capabilities: Capabilities)
-
Delete the smallest values from a MySQL table
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class MySqlDeleteFromTail(MySqlDML): """Delete the smallest values from a MySQL table""" def run(self, c: Composition) -> None: self.mysql_table.watermarks.min = min( self.mysql_table.watermarks.min + self.delta, self.mysql_table.watermarks.max, ) c.testdrive( dedent( f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; DELETE FROM {self.mysql_table.name} WHERE f1 < {self.mysql_table.watermarks.min}; """ ) )
Ancestors
Inherited members
class MySqlInsert (capabilities: Capabilities)
-
Inserts rows into a MySQL table.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class MySqlInsert(MySqlDML): """Inserts rows into a MySQL table.""" def run(self, c: Composition) -> None: prev_max = self.mysql_table.watermarks.max self.mysql_table.watermarks.max = prev_max + self.delta c.testdrive( dedent( f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; SET @i:={prev_max}; INSERT INTO {self.mysql_table.name} SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.mysql_table.watermarks.max - prev_max}; """ ) )
Ancestors
Inherited members
class MySqlRestart (capabilities: Capabilities)
-
Restart the MySql instance.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class MySqlRestart(Action): """Restart the MySql instance.""" def run(self, c: Composition) -> None: c.kill("mysql") c.up("mysql")
Ancestors
Inherited members
class MySqlShiftBackward (capabilities: Capabilities)
-
Update all rows from a MySQL table by decrementing their values by a constant (tables without a PK only)
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class MySqlShiftBackward(MySqlDML): """Update all rows from a MySQL table by decrementing their values by a constant (tables without a PK only)""" def run(self, c: Composition) -> None: if not self.mysql_table.has_pk: self.mysql_table.watermarks.shift(-self.delta) c.testdrive( dedent( f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; UPDATE {self.mysql_table.name} SET f1 = f1 - {self.delta}; """ ) )
Ancestors
Inherited members
class MySqlShiftForward (capabilities: Capabilities)
-
Update all rows from a MySQL table by incrementing their values by a constant (tables without a PK only)
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class MySqlShiftForward(MySqlDML): """Update all rows from a MySQL table by incrementing their values by a constant (tables without a PK only)""" def run(self, c: Composition) -> None: if not self.mysql_table.has_pk: self.mysql_table.watermarks.shift(self.delta) c.testdrive( dedent( f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; UPDATE {self.mysql_table.name} SET f1 = f1 + {self.delta}; """ ) )
Ancestors
Inherited members
class MySqlStart (capabilities: Capabilities)
-
Start a MySQL instance.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class MySqlStart(Action): """Start a MySQL instance.""" def provides(self) -> list[Capability]: return [MySqlRunning()] def run(self, c: Composition) -> None: c.up("mysql") c.testdrive( dedent( f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql DROP DATABASE IF EXISTS public; CREATE DATABASE public; USE public; """ ) )
Ancestors
Inherited members
class MySqlStop (capabilities: Capabilities)
-
Stop the MySQL instance.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class MySqlStop(Action): """Stop the MySQL instance.""" @classmethod def requires(cls) -> set[type[Capability]]: return {MySqlRunning} def withholds(self) -> set[type[Capability]]: return {MySqlRunning} def run(self, c: Composition) -> None: c.kill("mysql")
Ancestors
Inherited members