Module materialize.zippy.view_actions
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
from textwrap import dedent
from materialize.mzcompose.composition import Composition
from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
from materialize.zippy.debezium_capabilities import DebeziumSourceExists
from materialize.zippy.framework import Action, ActionFactory, Capabilities, Capability
from materialize.zippy.mysql_cdc_capabilities import MySqlCdcTableExists
from materialize.zippy.mz_capabilities import MzIsRunning
from materialize.zippy.pg_cdc_capabilities import PostgresCdcTableExists
from materialize.zippy.source_capabilities import SourceExists
from materialize.zippy.storaged_capabilities import StoragedRunning
from materialize.zippy.table_capabilities import TableExists
from materialize.zippy.view_capabilities import ViewExists, WatermarkedObjects
class CreateViewParameterized(ActionFactory):
"""Emits CreateView Actions within the constraints specified in the constructor."""
@classmethod
def requires(cls) -> list[set[type[Capability]]]:
return [
{BalancerdIsRunning, MzIsRunning, SourceExists},
{BalancerdIsRunning, MzIsRunning, TableExists},
{BalancerdIsRunning, MzIsRunning, DebeziumSourceExists},
{BalancerdIsRunning, MzIsRunning, PostgresCdcTableExists},
{BalancerdIsRunning, MzIsRunning, MySqlCdcTableExists},
]
def __init__(
self,
max_views: int = 10,
max_inputs: int = 5,
expensive_aggregates: bool = True,
) -> None:
self.max_views = max_views
self.max_inputs = max_inputs
self.expensive_aggregates = expensive_aggregates
def new(self, capabilities: Capabilities) -> list[Action]:
new_view_name = capabilities.get_free_capability_name(
ViewExists, self.max_views
)
if new_view_name:
potential_inputs: WatermarkedObjects = []
for source_capability in [
SourceExists,
TableExists,
DebeziumSourceExists,
PostgresCdcTableExists,
MySqlCdcTableExists,
]:
potential_inputs.extend(capabilities.get(source_capability))
inputs = random.sample(
potential_inputs,
min(len(potential_inputs), random.randint(1, self.max_inputs)),
)
return [
CreateView(
capabilities=capabilities,
view=ViewExists(
name=new_view_name,
has_index=random.choice([True, False]),
expensive_aggregates=self.expensive_aggregates,
inputs=inputs,
),
)
]
else:
return []
class CreateView(Action):
"""Creates a view that is a join over one or more sources or tables"""
def __init__(self, capabilities: Capabilities, view: ViewExists) -> None:
self.view = view
super().__init__(capabilities)
def run(self, c: Composition) -> None:
first_input = self.view.inputs[0]
outer_join = " ".join(f"JOIN {f.name} USING (f1)" for f in self.view.inputs[1:])
index = (
f"> CREATE DEFAULT INDEX ON {self.view.name}" if self.view.has_index else ""
)
aggregates = [f"COUNT({first_input.name}.f1) AS count_all"]
if self.view.expensive_aggregates:
aggregates.extend(
[
f"COUNT(DISTINCT {first_input.name}.f1) AS count_distinct",
f"MIN({first_input.name}.f1) AS min_value",
f"MAX({first_input.name}.f1) AS max_value",
]
)
aggregates = ", ".join(aggregates)
refresh = random.choice(
["ON COMMIT", f"EVERY '{random.randint(1, 5)} seconds'"]
)
c.testdrive(
dedent(
f"""
> CREATE MATERIALIZED VIEW {self.view.name}
WITH (REFRESH {refresh}) AS
SELECT {aggregates}
FROM {first_input.name}
{outer_join}
"""
)
+ index
)
def provides(self) -> list[Capability]:
return [self.view]
class ValidateView(Action):
"""Validates a view."""
@classmethod
def requires(cls) -> set[type[Capability]]:
return {BalancerdIsRunning, MzIsRunning, StoragedRunning, ViewExists}
def __init__(
self, capabilities: Capabilities, view: ViewExists | None = None
) -> None:
if view is None:
self.view = random.choice(capabilities.get(ViewExists))
else:
self.view = view
# Trigger the PeekPersist optimization
self.select_limit = random.choice(["", "LIMIT 1"])
super().__init__(capabilities)
def run(self, c: Composition) -> None:
watermarks = self.view.get_watermarks()
view_min = watermarks.min
view_max = watermarks.max
if view_min <= view_max:
c.testdrive(
dedent(
f"""
> SELECT count_all, count_distinct, min_value, max_value FROM {self.view.name} {self.select_limit} /* expecting count_all = {(view_max-view_min)+1} count_distinct = {(view_max-view_min)+1} min_value = {view_min} max_value = {view_max} */ ;
{(view_max-view_min)+1} {(view_max-view_min)+1} {view_min} {view_max}
"""
)
if self.view.expensive_aggregates
else dedent(
f"""
> SELECT count_all FROM {self.view.name} {self.select_limit} /* expecting count_all = {(view_max-view_min)+1} */ ;
{(view_max-view_min)+1}
"""
)
)
Classes
class CreateView (capabilities: Capabilities, view: ViewExists)
-
Creates a view that is a join over one or more sources or tables
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class CreateView(Action): """Creates a view that is a join over one or more sources or tables""" def __init__(self, capabilities: Capabilities, view: ViewExists) -> None: self.view = view super().__init__(capabilities) def run(self, c: Composition) -> None: first_input = self.view.inputs[0] outer_join = " ".join(f"JOIN {f.name} USING (f1)" for f in self.view.inputs[1:]) index = ( f"> CREATE DEFAULT INDEX ON {self.view.name}" if self.view.has_index else "" ) aggregates = [f"COUNT({first_input.name}.f1) AS count_all"] if self.view.expensive_aggregates: aggregates.extend( [ f"COUNT(DISTINCT {first_input.name}.f1) AS count_distinct", f"MIN({first_input.name}.f1) AS min_value", f"MAX({first_input.name}.f1) AS max_value", ] ) aggregates = ", ".join(aggregates) refresh = random.choice( ["ON COMMIT", f"EVERY '{random.randint(1, 5)} seconds'"] ) c.testdrive( dedent( f""" > CREATE MATERIALIZED VIEW {self.view.name} WITH (REFRESH {refresh}) AS SELECT {aggregates} FROM {first_input.name} {outer_join} """ ) + index ) def provides(self) -> list[Capability]: return [self.view]
Ancestors
Inherited members
class CreateViewParameterized (max_views: int = 10, max_inputs: int = 5, expensive_aggregates: bool = True)
-
Emits CreateView Actions within the constraints specified in the constructor.
Expand source code Browse git
class CreateViewParameterized(ActionFactory): """Emits CreateView Actions within the constraints specified in the constructor.""" @classmethod def requires(cls) -> list[set[type[Capability]]]: return [ {BalancerdIsRunning, MzIsRunning, SourceExists}, {BalancerdIsRunning, MzIsRunning, TableExists}, {BalancerdIsRunning, MzIsRunning, DebeziumSourceExists}, {BalancerdIsRunning, MzIsRunning, PostgresCdcTableExists}, {BalancerdIsRunning, MzIsRunning, MySqlCdcTableExists}, ] def __init__( self, max_views: int = 10, max_inputs: int = 5, expensive_aggregates: bool = True, ) -> None: self.max_views = max_views self.max_inputs = max_inputs self.expensive_aggregates = expensive_aggregates def new(self, capabilities: Capabilities) -> list[Action]: new_view_name = capabilities.get_free_capability_name( ViewExists, self.max_views ) if new_view_name: potential_inputs: WatermarkedObjects = [] for source_capability in [ SourceExists, TableExists, DebeziumSourceExists, PostgresCdcTableExists, MySqlCdcTableExists, ]: potential_inputs.extend(capabilities.get(source_capability)) inputs = random.sample( potential_inputs, min(len(potential_inputs), random.randint(1, self.max_inputs)), ) return [ CreateView( capabilities=capabilities, view=ViewExists( name=new_view_name, has_index=random.choice([True, False]), expensive_aggregates=self.expensive_aggregates, inputs=inputs, ), ) ] else: return []
Ancestors
Methods
def new(self, capabilities: Capabilities) ‑> list[Action]
-
Expand source code Browse git
def new(self, capabilities: Capabilities) -> list[Action]: new_view_name = capabilities.get_free_capability_name( ViewExists, self.max_views ) if new_view_name: potential_inputs: WatermarkedObjects = [] for source_capability in [ SourceExists, TableExists, DebeziumSourceExists, PostgresCdcTableExists, MySqlCdcTableExists, ]: potential_inputs.extend(capabilities.get(source_capability)) inputs = random.sample( potential_inputs, min(len(potential_inputs), random.randint(1, self.max_inputs)), ) return [ CreateView( capabilities=capabilities, view=ViewExists( name=new_view_name, has_index=random.choice([True, False]), expensive_aggregates=self.expensive_aggregates, inputs=inputs, ), ) ] else: return []
Inherited members
class ValidateView (capabilities: Capabilities, view: ViewExists | None = None)
-
Validates a view.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class ValidateView(Action): """Validates a view.""" @classmethod def requires(cls) -> set[type[Capability]]: return {BalancerdIsRunning, MzIsRunning, StoragedRunning, ViewExists} def __init__( self, capabilities: Capabilities, view: ViewExists | None = None ) -> None: if view is None: self.view = random.choice(capabilities.get(ViewExists)) else: self.view = view # Trigger the PeekPersist optimization self.select_limit = random.choice(["", "LIMIT 1"]) super().__init__(capabilities) def run(self, c: Composition) -> None: watermarks = self.view.get_watermarks() view_min = watermarks.min view_max = watermarks.max if view_min <= view_max: c.testdrive( dedent( f""" > SELECT count_all, count_distinct, min_value, max_value FROM {self.view.name} {self.select_limit} /* expecting count_all = {(view_max-view_min)+1} count_distinct = {(view_max-view_min)+1} min_value = {view_min} max_value = {view_max} */ ; {(view_max-view_min)+1} {(view_max-view_min)+1} {view_min} {view_max} """ ) if self.view.expensive_aggregates else dedent( f""" > SELECT count_all FROM {self.view.name} {self.select_limit} /* expecting count_all = {(view_max-view_min)+1} */ ; {(view_max-view_min)+1} """ ) )
Ancestors
Inherited members