Module materialize.zippy.mysql_actions

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import random
from textwrap import dedent

from materialize.mzcompose.composition import Composition
from materialize.mzcompose.services.mysql import MySql
from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
from materialize.zippy.framework import Action, Capabilities, Capability
from materialize.zippy.mysql_capabilities import MySqlRunning, MySqlTableExists
from materialize.zippy.mz_capabilities import MzIsRunning


class MySqlStart(Action):
    """Start a MySQL instance."""

    def provides(self) -> list[Capability]:
        return [MySqlRunning()]

    def run(self, c: Composition) -> None:
        c.up("mysql")

        c.testdrive(
            dedent(
                f"""
                $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                $ mysql-execute name=mysql
                DROP DATABASE IF EXISTS public;
                CREATE DATABASE public;
                USE public;
                """
            )
        )


class MySqlStop(Action):
    """Stop the MySQL instance."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MySqlRunning}

    def withholds(self) -> set[type[Capability]]:
        return {MySqlRunning}

    def run(self, c: Composition) -> None:
        c.kill("mysql")


class MySqlRestart(Action):
    """Restart the MySql instance."""

    def run(self, c: Composition) -> None:
        c.kill("mysql")
        c.up("mysql")


class CreateMySqlTable(Action):
    """Creates a table on the MySql instance. 50% of the tables have a PK."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {BalancerdIsRunning, MzIsRunning, MySqlRunning}

    def __init__(self, capabilities: Capabilities) -> None:
        this_mysql_table = MySqlTableExists(name="table" + str(random.randint(1, 10)))

        existing_mysql_tables = [
            t
            for t in capabilities.get(MySqlTableExists)
            if t.name == this_mysql_table.name
        ]

        if len(existing_mysql_tables) == 0:
            self.new_mysql_table = True
            # A PK is now required for Debezium
            this_mysql_table.has_pk = True

            self.mysql_table = this_mysql_table
        elif len(existing_mysql_tables) == 1:
            self.new_mysql_table = False
            self.mysql_table = existing_mysql_tables[0]
        else:
            assert False

        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        if self.new_mysql_table:
            primary_key = "PRIMARY KEY" if self.mysql_table.has_pk else ""
            c.testdrive(
                dedent(
                    f"""
                    $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                    $ mysql-execute name=mysql
                    USE public;
                    CREATE TABLE {self.mysql_table.name} (f1 INTEGER {primary_key});
                    INSERT INTO {self.mysql_table.name} VALUES ({self.mysql_table.watermarks.max});
                    """
                )
            )

    def provides(self) -> list[Capability]:
        return [self.mysql_table] if self.new_mysql_table else []


class MySqlDML(Action):
    """Performs an INSERT, DELETE or UPDATE against a MySQL table."""

    # We use smaller batches in Pg then in Mz because Pg will fill up much faster
    MAX_BATCH_SIZE = 10000

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {BalancerdIsRunning, MzIsRunning, MySqlRunning, MySqlTableExists}

    def __init__(self, capabilities: Capabilities) -> None:
        self.mysql_table = random.choice(capabilities.get(MySqlTableExists))
        self.delta = random.randint(1, MySqlDML.MAX_BATCH_SIZE)

        super().__init__(capabilities)

    def __str__(self) -> str:
        return f"{Action.__str__(self)} {self.mysql_table.name}"


class MySqlInsert(MySqlDML):
    """Inserts rows into a MySQL table."""

    def run(self, c: Composition) -> None:
        prev_max = self.mysql_table.watermarks.max
        self.mysql_table.watermarks.max = prev_max + self.delta
        c.testdrive(
            dedent(
                f"""
                $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                $ mysql-execute name=mysql
                USE public;
                SET @i:={prev_max};
                INSERT INTO {self.mysql_table.name} SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.mysql_table.watermarks.max - prev_max};
                """
            )
        )


class MySqlShiftForward(MySqlDML):
    """Update all rows from a MySQL table by incrementing their values by a constant (tables without a PK only)"""

    def run(self, c: Composition) -> None:
        if not self.mysql_table.has_pk:
            self.mysql_table.watermarks.shift(self.delta)
            c.testdrive(
                dedent(
                    f"""
                    $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                    $ mysql-execute name=mysql
                    USE public;
                    UPDATE {self.mysql_table.name} SET f1 = f1 + {self.delta};
                    """
                )
            )


class MySqlShiftBackward(MySqlDML):
    """Update all rows from a MySQL table by decrementing their values by a constant (tables without a PK only)"""

    def run(self, c: Composition) -> None:
        if not self.mysql_table.has_pk:
            self.mysql_table.watermarks.shift(-self.delta)
            c.testdrive(
                dedent(
                    f"""
                    $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                    $ mysql-execute name=mysql
                    USE public;
                    UPDATE {self.mysql_table.name} SET f1 = f1 - {self.delta};
                    """
                )
            )


class MySqlDeleteFromHead(MySqlDML):
    """Delete the largest values from a MySQL table"""

    def run(self, c: Composition) -> None:
        self.mysql_table.watermarks.max = max(
            self.mysql_table.watermarks.max - self.delta,
            self.mysql_table.watermarks.min,
        )
        c.testdrive(
            dedent(
                f"""
                $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                $ mysql-execute name=mysql
                USE public;
                DELETE FROM {self.mysql_table.name} WHERE f1 > {self.mysql_table.watermarks.max};
                """
            )
        )


class MySqlDeleteFromTail(MySqlDML):
    """Delete the smallest values from a MySQL table"""

    def run(self, c: Composition) -> None:
        self.mysql_table.watermarks.min = min(
            self.mysql_table.watermarks.min + self.delta,
            self.mysql_table.watermarks.max,
        )
        c.testdrive(
            dedent(
                f"""
                $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                $ mysql-execute name=mysql
                USE public;
                DELETE FROM {self.mysql_table.name} WHERE f1 < {self.mysql_table.watermarks.min};
                """
            )
        )

Classes

class CreateMySqlTable (capabilities: Capabilities)

Creates a table on the MySql instance. 50% of the tables have a PK.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class CreateMySqlTable(Action):
    """Creates a table on the MySql instance. 50% of the tables have a PK."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {BalancerdIsRunning, MzIsRunning, MySqlRunning}

    def __init__(self, capabilities: Capabilities) -> None:
        this_mysql_table = MySqlTableExists(name="table" + str(random.randint(1, 10)))

        existing_mysql_tables = [
            t
            for t in capabilities.get(MySqlTableExists)
            if t.name == this_mysql_table.name
        ]

        if len(existing_mysql_tables) == 0:
            self.new_mysql_table = True
            # A PK is now required for Debezium
            this_mysql_table.has_pk = True

            self.mysql_table = this_mysql_table
        elif len(existing_mysql_tables) == 1:
            self.new_mysql_table = False
            self.mysql_table = existing_mysql_tables[0]
        else:
            assert False

        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        if self.new_mysql_table:
            primary_key = "PRIMARY KEY" if self.mysql_table.has_pk else ""
            c.testdrive(
                dedent(
                    f"""
                    $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                    $ mysql-execute name=mysql
                    USE public;
                    CREATE TABLE {self.mysql_table.name} (f1 INTEGER {primary_key});
                    INSERT INTO {self.mysql_table.name} VALUES ({self.mysql_table.watermarks.max});
                    """
                )
            )

    def provides(self) -> list[Capability]:
        return [self.mysql_table] if self.new_mysql_table else []

Ancestors

Inherited members

class MySqlDML (capabilities: Capabilities)

Performs an INSERT, DELETE or UPDATE against a MySQL table.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class MySqlDML(Action):
    """Performs an INSERT, DELETE or UPDATE against a MySQL table."""

    # We use smaller batches in Pg then in Mz because Pg will fill up much faster
    MAX_BATCH_SIZE = 10000

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {BalancerdIsRunning, MzIsRunning, MySqlRunning, MySqlTableExists}

    def __init__(self, capabilities: Capabilities) -> None:
        self.mysql_table = random.choice(capabilities.get(MySqlTableExists))
        self.delta = random.randint(1, MySqlDML.MAX_BATCH_SIZE)

        super().__init__(capabilities)

    def __str__(self) -> str:
        return f"{Action.__str__(self)} {self.mysql_table.name}"

Ancestors

Subclasses

Class variables

var MAX_BATCH_SIZE

Inherited members

class MySqlDeleteFromHead (capabilities: Capabilities)

Delete the largest values from a MySQL table

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class MySqlDeleteFromHead(MySqlDML):
    """Delete the largest values from a MySQL table"""

    def run(self, c: Composition) -> None:
        self.mysql_table.watermarks.max = max(
            self.mysql_table.watermarks.max - self.delta,
            self.mysql_table.watermarks.min,
        )
        c.testdrive(
            dedent(
                f"""
                $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                $ mysql-execute name=mysql
                USE public;
                DELETE FROM {self.mysql_table.name} WHERE f1 > {self.mysql_table.watermarks.max};
                """
            )
        )

Ancestors

Inherited members

class MySqlDeleteFromTail (capabilities: Capabilities)

Delete the smallest values from a MySQL table

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class MySqlDeleteFromTail(MySqlDML):
    """Delete the smallest values from a MySQL table"""

    def run(self, c: Composition) -> None:
        self.mysql_table.watermarks.min = min(
            self.mysql_table.watermarks.min + self.delta,
            self.mysql_table.watermarks.max,
        )
        c.testdrive(
            dedent(
                f"""
                $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                $ mysql-execute name=mysql
                USE public;
                DELETE FROM {self.mysql_table.name} WHERE f1 < {self.mysql_table.watermarks.min};
                """
            )
        )

Ancestors

Inherited members

class MySqlInsert (capabilities: Capabilities)

Inserts rows into a MySQL table.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class MySqlInsert(MySqlDML):
    """Inserts rows into a MySQL table."""

    def run(self, c: Composition) -> None:
        prev_max = self.mysql_table.watermarks.max
        self.mysql_table.watermarks.max = prev_max + self.delta
        c.testdrive(
            dedent(
                f"""
                $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                $ mysql-execute name=mysql
                USE public;
                SET @i:={prev_max};
                INSERT INTO {self.mysql_table.name} SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.mysql_table.watermarks.max - prev_max};
                """
            )
        )

Ancestors

Inherited members

class MySqlRestart (capabilities: Capabilities)

Restart the MySql instance.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class MySqlRestart(Action):
    """Restart the MySql instance."""

    def run(self, c: Composition) -> None:
        c.kill("mysql")
        c.up("mysql")

Ancestors

Inherited members

class MySqlShiftBackward (capabilities: Capabilities)

Update all rows from a MySQL table by decrementing their values by a constant (tables without a PK only)

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class MySqlShiftBackward(MySqlDML):
    """Update all rows from a MySQL table by decrementing their values by a constant (tables without a PK only)"""

    def run(self, c: Composition) -> None:
        if not self.mysql_table.has_pk:
            self.mysql_table.watermarks.shift(-self.delta)
            c.testdrive(
                dedent(
                    f"""
                    $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                    $ mysql-execute name=mysql
                    USE public;
                    UPDATE {self.mysql_table.name} SET f1 = f1 - {self.delta};
                    """
                )
            )

Ancestors

Inherited members

class MySqlShiftForward (capabilities: Capabilities)

Update all rows from a MySQL table by incrementing their values by a constant (tables without a PK only)

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class MySqlShiftForward(MySqlDML):
    """Update all rows from a MySQL table by incrementing their values by a constant (tables without a PK only)"""

    def run(self, c: Composition) -> None:
        if not self.mysql_table.has_pk:
            self.mysql_table.watermarks.shift(self.delta)
            c.testdrive(
                dedent(
                    f"""
                    $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                    $ mysql-execute name=mysql
                    USE public;
                    UPDATE {self.mysql_table.name} SET f1 = f1 + {self.delta};
                    """
                )
            )

Ancestors

Inherited members

class MySqlStart (capabilities: Capabilities)

Start a MySQL instance.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class MySqlStart(Action):
    """Start a MySQL instance."""

    def provides(self) -> list[Capability]:
        return [MySqlRunning()]

    def run(self, c: Composition) -> None:
        c.up("mysql")

        c.testdrive(
            dedent(
                f"""
                $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

                $ mysql-execute name=mysql
                DROP DATABASE IF EXISTS public;
                CREATE DATABASE public;
                USE public;
                """
            )
        )

Ancestors

Inherited members

class MySqlStop (capabilities: Capabilities)

Stop the MySQL instance.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class MySqlStop(Action):
    """Stop the MySQL instance."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MySqlRunning}

    def withholds(self) -> set[type[Capability]]:
        return {MySqlRunning}

    def run(self, c: Composition) -> None:
        c.kill("mysql")

Ancestors

Inherited members