Module materialize.zippy.view_actions

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import random
from textwrap import dedent

from materialize.mzcompose.composition import Composition
from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
from materialize.zippy.debezium_capabilities import DebeziumSourceExists
from materialize.zippy.framework import Action, ActionFactory, Capabilities, Capability
from materialize.zippy.mysql_cdc_capabilities import MySqlCdcTableExists
from materialize.zippy.mz_capabilities import MzIsRunning
from materialize.zippy.pg_cdc_capabilities import PostgresCdcTableExists
from materialize.zippy.source_capabilities import SourceExists
from materialize.zippy.storaged_capabilities import StoragedRunning
from materialize.zippy.table_capabilities import TableExists
from materialize.zippy.view_capabilities import ViewExists, WatermarkedObjects


class CreateViewParameterized(ActionFactory):
    """Emits CreateView Actions within the constraints specified in the constructor."""

    @classmethod
    def requires(cls) -> list[set[type[Capability]]]:
        return [
            {BalancerdIsRunning, MzIsRunning, SourceExists},
            {BalancerdIsRunning, MzIsRunning, TableExists},
            {BalancerdIsRunning, MzIsRunning, DebeziumSourceExists},
            {BalancerdIsRunning, MzIsRunning, PostgresCdcTableExists},
            {BalancerdIsRunning, MzIsRunning, MySqlCdcTableExists},
        ]

    def __init__(
        self,
        max_views: int = 10,
        max_inputs: int = 5,
        expensive_aggregates: bool = True,
    ) -> None:
        self.max_views = max_views
        self.max_inputs = max_inputs
        self.expensive_aggregates = expensive_aggregates

    def new(self, capabilities: Capabilities) -> list[Action]:
        new_view_name = capabilities.get_free_capability_name(
            ViewExists, self.max_views
        )
        if new_view_name:
            potential_inputs: WatermarkedObjects = []
            for source_capability in [
                SourceExists,
                TableExists,
                DebeziumSourceExists,
                PostgresCdcTableExists,
                MySqlCdcTableExists,
            ]:
                potential_inputs.extend(capabilities.get(source_capability))

            inputs = random.sample(
                potential_inputs,
                min(len(potential_inputs), random.randint(1, self.max_inputs)),
            )

            return [
                CreateView(
                    capabilities=capabilities,
                    view=ViewExists(
                        name=new_view_name,
                        has_index=random.choice([True, False]),
                        expensive_aggregates=self.expensive_aggregates,
                        inputs=inputs,
                    ),
                )
            ]
        else:
            return []


class CreateView(Action):
    """Creates a view that is a join over one or more sources or tables"""

    def __init__(self, capabilities: Capabilities, view: ViewExists) -> None:
        self.view = view
        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        first_input = self.view.inputs[0]
        outer_join = " ".join(f"JOIN {f.name} USING (f1)" for f in self.view.inputs[1:])

        index = (
            f"> CREATE DEFAULT INDEX ON {self.view.name}" if self.view.has_index else ""
        )

        aggregates = [f"COUNT({first_input.name}.f1) AS count_all"]

        if self.view.expensive_aggregates:
            aggregates.extend(
                [
                    f"COUNT(DISTINCT {first_input.name}.f1) AS count_distinct",
                    f"MIN({first_input.name}.f1) AS min_value",
                    f"MAX({first_input.name}.f1) AS max_value",
                ]
            )

        aggregates = ", ".join(aggregates)

        refresh = random.choice(
            ["ON COMMIT", f"EVERY '{random.randint(1, 5)} seconds'"]
        )

        c.testdrive(
            dedent(
                f"""
                > CREATE MATERIALIZED VIEW {self.view.name}
                  WITH (REFRESH {refresh}) AS
                  SELECT {aggregates}
                  FROM {first_input.name}
                  {outer_join}
                """
            )
            + index
        )

    def provides(self) -> list[Capability]:
        return [self.view]


class ValidateView(Action):
    """Validates a view."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {BalancerdIsRunning, MzIsRunning, StoragedRunning, ViewExists}

    def __init__(
        self, capabilities: Capabilities, view: ViewExists | None = None
    ) -> None:
        if view is None:
            self.view = random.choice(capabilities.get(ViewExists))
        else:
            self.view = view

        # Trigger the PeekPersist optimization
        self.select_limit = random.choice(["", "LIMIT 1"])
        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        watermarks = self.view.get_watermarks()
        view_min = watermarks.min
        view_max = watermarks.max

        if view_min <= view_max:
            c.testdrive(
                dedent(
                    f"""
                    > SELECT count_all, count_distinct, min_value, max_value FROM {self.view.name} {self.select_limit} /* expecting count_all = {(view_max-view_min)+1} count_distinct = {(view_max-view_min)+1} min_value = {view_min} max_value = {view_max} */ ;
                    {(view_max-view_min)+1} {(view_max-view_min)+1} {view_min} {view_max}
                """
                )
                if self.view.expensive_aggregates
                else dedent(
                    f"""
                    > SELECT count_all FROM {self.view.name} {self.select_limit} /* expecting count_all = {(view_max-view_min)+1} */ ;
                    {(view_max-view_min)+1}
                """
                )
            )

Classes

class CreateView (capabilities: Capabilities, view: ViewExists)

Creates a view that is a join over one or more sources or tables

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class CreateView(Action):
    """Creates a view that is a join over one or more sources or tables"""

    def __init__(self, capabilities: Capabilities, view: ViewExists) -> None:
        self.view = view
        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        first_input = self.view.inputs[0]
        outer_join = " ".join(f"JOIN {f.name} USING (f1)" for f in self.view.inputs[1:])

        index = (
            f"> CREATE DEFAULT INDEX ON {self.view.name}" if self.view.has_index else ""
        )

        aggregates = [f"COUNT({first_input.name}.f1) AS count_all"]

        if self.view.expensive_aggregates:
            aggregates.extend(
                [
                    f"COUNT(DISTINCT {first_input.name}.f1) AS count_distinct",
                    f"MIN({first_input.name}.f1) AS min_value",
                    f"MAX({first_input.name}.f1) AS max_value",
                ]
            )

        aggregates = ", ".join(aggregates)

        refresh = random.choice(
            ["ON COMMIT", f"EVERY '{random.randint(1, 5)} seconds'"]
        )

        c.testdrive(
            dedent(
                f"""
                > CREATE MATERIALIZED VIEW {self.view.name}
                  WITH (REFRESH {refresh}) AS
                  SELECT {aggregates}
                  FROM {first_input.name}
                  {outer_join}
                """
            )
            + index
        )

    def provides(self) -> list[Capability]:
        return [self.view]

Ancestors

Inherited members

class CreateViewParameterized (max_views: int = 10, max_inputs: int = 5, expensive_aggregates: bool = True)

Emits CreateView Actions within the constraints specified in the constructor.

Expand source code Browse git
class CreateViewParameterized(ActionFactory):
    """Emits CreateView Actions within the constraints specified in the constructor."""

    @classmethod
    def requires(cls) -> list[set[type[Capability]]]:
        return [
            {BalancerdIsRunning, MzIsRunning, SourceExists},
            {BalancerdIsRunning, MzIsRunning, TableExists},
            {BalancerdIsRunning, MzIsRunning, DebeziumSourceExists},
            {BalancerdIsRunning, MzIsRunning, PostgresCdcTableExists},
            {BalancerdIsRunning, MzIsRunning, MySqlCdcTableExists},
        ]

    def __init__(
        self,
        max_views: int = 10,
        max_inputs: int = 5,
        expensive_aggregates: bool = True,
    ) -> None:
        self.max_views = max_views
        self.max_inputs = max_inputs
        self.expensive_aggregates = expensive_aggregates

    def new(self, capabilities: Capabilities) -> list[Action]:
        new_view_name = capabilities.get_free_capability_name(
            ViewExists, self.max_views
        )
        if new_view_name:
            potential_inputs: WatermarkedObjects = []
            for source_capability in [
                SourceExists,
                TableExists,
                DebeziumSourceExists,
                PostgresCdcTableExists,
                MySqlCdcTableExists,
            ]:
                potential_inputs.extend(capabilities.get(source_capability))

            inputs = random.sample(
                potential_inputs,
                min(len(potential_inputs), random.randint(1, self.max_inputs)),
            )

            return [
                CreateView(
                    capabilities=capabilities,
                    view=ViewExists(
                        name=new_view_name,
                        has_index=random.choice([True, False]),
                        expensive_aggregates=self.expensive_aggregates,
                        inputs=inputs,
                    ),
                )
            ]
        else:
            return []

Ancestors

Methods

def new(self, capabilities: Capabilities) ‑> list[Action]
Expand source code Browse git
def new(self, capabilities: Capabilities) -> list[Action]:
    new_view_name = capabilities.get_free_capability_name(
        ViewExists, self.max_views
    )
    if new_view_name:
        potential_inputs: WatermarkedObjects = []
        for source_capability in [
            SourceExists,
            TableExists,
            DebeziumSourceExists,
            PostgresCdcTableExists,
            MySqlCdcTableExists,
        ]:
            potential_inputs.extend(capabilities.get(source_capability))

        inputs = random.sample(
            potential_inputs,
            min(len(potential_inputs), random.randint(1, self.max_inputs)),
        )

        return [
            CreateView(
                capabilities=capabilities,
                view=ViewExists(
                    name=new_view_name,
                    has_index=random.choice([True, False]),
                    expensive_aggregates=self.expensive_aggregates,
                    inputs=inputs,
                ),
            )
        ]
    else:
        return []

Inherited members

class ValidateView (capabilities: Capabilities, view: ViewExists | None = None)

Validates a view.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class ValidateView(Action):
    """Validates a view."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {BalancerdIsRunning, MzIsRunning, StoragedRunning, ViewExists}

    def __init__(
        self, capabilities: Capabilities, view: ViewExists | None = None
    ) -> None:
        if view is None:
            self.view = random.choice(capabilities.get(ViewExists))
        else:
            self.view = view

        # Trigger the PeekPersist optimization
        self.select_limit = random.choice(["", "LIMIT 1"])
        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        watermarks = self.view.get_watermarks()
        view_min = watermarks.min
        view_max = watermarks.max

        if view_min <= view_max:
            c.testdrive(
                dedent(
                    f"""
                    > SELECT count_all, count_distinct, min_value, max_value FROM {self.view.name} {self.select_limit} /* expecting count_all = {(view_max-view_min)+1} count_distinct = {(view_max-view_min)+1} min_value = {view_min} max_value = {view_max} */ ;
                    {(view_max-view_min)+1} {(view_max-view_min)+1} {view_min} {view_max}
                """
                )
                if self.view.expensive_aggregates
                else dedent(
                    f"""
                    > SELECT count_all FROM {self.view.name} {self.select_limit} /* expecting count_all = {(view_max-view_min)+1} */ ;
                    {(view_max-view_min)+1}
                """
                )
            )

Ancestors

Inherited members