Module materialize.output_consistency.output_consistency_test
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import argparse
import pg8000
from pg8000 import Connection
from pg8000.exceptions import InterfaceError
from materialize.output_consistency.common.configuration import (
ConsistencyTestConfiguration,
)
from materialize.output_consistency.execution.evaluation_strategy import (
ConstantFoldingEvaluation,
DataFlowRenderingEvaluation,
EvaluationStrategy,
)
from materialize.output_consistency.execution.sql_executor import create_sql_executor
from materialize.output_consistency.execution.sql_executors import SqlExecutors
from materialize.output_consistency.execution.test_summary import ConsistencyTestSummary
from materialize.output_consistency.generators.expression_generator import (
ExpressionGenerator,
)
from materialize.output_consistency.generators.query_generator import QueryGenerator
from materialize.output_consistency.ignore_filter.inconsistency_ignore_filter import (
GenericInconsistencyIgnoreFilter,
)
from materialize.output_consistency.ignore_filter.internal_output_inconsistency_ignore_filter import (
InternalOutputInconsistencyIgnoreFilter,
)
from materialize.output_consistency.input_data.scenarios.evaluation_scenario import (
EvaluationScenario,
)
from materialize.output_consistency.input_data.test_input_data import (
ConsistencyTestInputData,
)
from materialize.output_consistency.output.output_printer import OutputPrinter
from materialize.output_consistency.runner.test_runner import ConsistencyTestRunner
from materialize.output_consistency.selection.randomized_picker import RandomizedPicker
from materialize.output_consistency.validation.result_comparator import ResultComparator
class OutputConsistencyTest:
def run_output_consistency_tests(
self,
connection: Connection,
args: argparse.Namespace,
) -> ConsistencyTestSummary:
"""Entry point for output consistency tests"""
return self._run_output_consistency_tests_internal(
connection,
args.seed,
args.dry_run,
args.fail_fast,
args.verbose,
args.max_cols_per_query,
args.max_runtime_in_sec,
args.max_iterations,
args.avoid_expressions_expecting_db_error,
)
def parse_output_consistency_input_args(
self,
parser: argparse.ArgumentParser,
) -> argparse.Namespace:
parser.add_argument("--seed", default="0", type=str)
parser.add_argument(
"--dry-run", default=False, type=bool, action=argparse.BooleanOptionalAction
)
parser.add_argument(
"--fail-fast",
default=False,
type=bool,
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"--verbose",
default=False,
type=bool,
action=argparse.BooleanOptionalAction,
)
parser.add_argument("--max-cols-per-query", default=20, type=int)
parser.add_argument("--max-runtime-in-sec", default=600, type=int)
parser.add_argument("--max-iterations", default=100000, type=int)
parser.add_argument(
"--avoid-expressions-expecting-db-error",
default=False,
type=bool,
action=argparse.BooleanOptionalAction,
)
return parser.parse_args()
def _run_output_consistency_tests_internal(
self,
connection: Connection,
random_seed: str,
dry_run: bool,
fail_fast: bool,
verbose_output: bool,
max_cols_per_query: int,
max_runtime_in_sec: int,
max_iterations: int,
avoid_expressions_expecting_db_error: bool,
) -> ConsistencyTestSummary:
input_data = self.create_input_data()
output_printer = OutputPrinter(input_data)
scenario = self.get_scenario()
config = ConsistencyTestConfiguration(
random_seed=random_seed,
scenario=scenario,
dry_run=dry_run,
fail_fast=fail_fast,
verbose_output=verbose_output,
max_cols_per_query=max_cols_per_query,
max_runtime_in_sec=max_runtime_in_sec,
max_iterations=max_iterations,
avoid_expressions_expecting_db_error=avoid_expressions_expecting_db_error,
queries_per_tx=20,
max_pending_expressions=100,
use_autocommit=True,
split_and_retry_on_db_error=True,
print_reproduction_code=True,
postgres_compatible_mode=scenario
== EvaluationScenario.POSTGRES_CONSISTENCY,
)
output_printer.print_config(config)
config.validate()
if config.postgres_compatible_mode:
input_data.remove_postgres_incompatible_data()
randomized_picker = RandomizedPicker(config)
sql_executors = self.create_sql_executors(config, connection, output_printer)
evaluation_strategies = self.create_evaluation_strategies(sql_executors)
ignore_filter = self.create_inconsistency_ignore_filter(sql_executors)
expression_generator = ExpressionGenerator(
config, randomized_picker, input_data
)
query_generator = QueryGenerator(
config, randomized_picker, input_data, ignore_filter
)
output_comparator = self.create_result_comparator(ignore_filter)
output_printer.print_info(sql_executors.get_database_infos())
output_printer.print_empty_line()
output_printer.print_info(input_data.get_stats())
output_printer.print_empty_line()
if not self.shall_run(sql_executors):
output_printer.print_info("Not running the test, criteria are not met.")
return ConsistencyTestSummary()
test_runner = ConsistencyTestRunner(
config,
input_data,
evaluation_strategies,
expression_generator,
query_generator,
output_comparator,
sql_executors,
randomized_picker,
ignore_filter,
output_printer,
)
test_runner.setup()
output_printer.start_section("Test remarks")
if not config.verbose_output:
output_printer.print_info(
"Printing only queries with inconsistencies or warnings in non-verbose mode."
)
output_printer.print_empty_line()
test_summary = test_runner.start()
output_printer.print_test_summary(test_summary)
return test_summary
def shall_run(self, sql_executors: SqlExecutors) -> bool:
return True
def create_input_data(self) -> ConsistencyTestInputData:
return ConsistencyTestInputData()
def create_sql_executors(
self,
config: ConsistencyTestConfiguration,
connection: Connection,
output_printer: OutputPrinter,
) -> SqlExecutors:
return SqlExecutors(
create_sql_executor(config, connection, output_printer, "mz")
)
def get_scenario(self) -> EvaluationScenario:
return EvaluationScenario.OUTPUT_CONSISTENCY
def create_result_comparator(
self, ignore_filter: GenericInconsistencyIgnoreFilter
) -> ResultComparator:
return ResultComparator(ignore_filter)
def create_inconsistency_ignore_filter(
self, sql_executors: SqlExecutors
) -> GenericInconsistencyIgnoreFilter:
return InternalOutputInconsistencyIgnoreFilter()
def create_evaluation_strategies(
self, sql_executors: SqlExecutors
) -> list[EvaluationStrategy]:
return [
DataFlowRenderingEvaluation(),
ConstantFoldingEvaluation(),
]
def connect(host: str, port: int, user: str, password: str | None = None) -> Connection:
try:
print(
f"Connecting to database (host={host}, port={port}, user={user}, password={'****' if password else 'None'})"
)
return pg8000.connect(host=host, port=port, user=user, password=password)
except InterfaceError:
print(f"Connecting to database failed (host={host}, port={port}, user={user})!")
raise
def main() -> int:
test = OutputConsistencyTest()
parser = argparse.ArgumentParser(
prog="output-consistency-test",
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Test the output consistency of different query evaluation strategies (e.g., dataflow rendering "
"and constant folding).",
)
parser.add_argument("--host", default="localhost", type=str)
parser.add_argument("--port", default=6875, type=int)
args = test.parse_output_consistency_input_args(parser)
db_user = "materialize"
try:
connection = connect(args.host, args.port, db_user)
except InterfaceError:
return 1
result = test.run_output_consistency_tests(connection, args)
return 0 if result.all_passed() else 1
if __name__ == "__main__":
exit(main())
Functions
def connect(host: str, port: int, user: str, password: str | None = None) ‑> pg8000.legacy.Connection
-
Expand source code Browse git
def connect(host: str, port: int, user: str, password: str | None = None) -> Connection: try: print( f"Connecting to database (host={host}, port={port}, user={user}, password={'****' if password else 'None'})" ) return pg8000.connect(host=host, port=port, user=user, password=password) except InterfaceError: print(f"Connecting to database failed (host={host}, port={port}, user={user})!") raise
def main() ‑> int
-
Expand source code Browse git
def main() -> int: test = OutputConsistencyTest() parser = argparse.ArgumentParser( prog="output-consistency-test", formatter_class=argparse.RawDescriptionHelpFormatter, description="Test the output consistency of different query evaluation strategies (e.g., dataflow rendering " "and constant folding).", ) parser.add_argument("--host", default="localhost", type=str) parser.add_argument("--port", default=6875, type=int) args = test.parse_output_consistency_input_args(parser) db_user = "materialize" try: connection = connect(args.host, args.port, db_user) except InterfaceError: return 1 result = test.run_output_consistency_tests(connection, args) return 0 if result.all_passed() else 1
Classes
class OutputConsistencyTest
-
Expand source code Browse git
class OutputConsistencyTest: def run_output_consistency_tests( self, connection: Connection, args: argparse.Namespace, ) -> ConsistencyTestSummary: """Entry point for output consistency tests""" return self._run_output_consistency_tests_internal( connection, args.seed, args.dry_run, args.fail_fast, args.verbose, args.max_cols_per_query, args.max_runtime_in_sec, args.max_iterations, args.avoid_expressions_expecting_db_error, ) def parse_output_consistency_input_args( self, parser: argparse.ArgumentParser, ) -> argparse.Namespace: parser.add_argument("--seed", default="0", type=str) parser.add_argument( "--dry-run", default=False, type=bool, action=argparse.BooleanOptionalAction ) parser.add_argument( "--fail-fast", default=False, type=bool, action=argparse.BooleanOptionalAction, ) parser.add_argument( "--verbose", default=False, type=bool, action=argparse.BooleanOptionalAction, ) parser.add_argument("--max-cols-per-query", default=20, type=int) parser.add_argument("--max-runtime-in-sec", default=600, type=int) parser.add_argument("--max-iterations", default=100000, type=int) parser.add_argument( "--avoid-expressions-expecting-db-error", default=False, type=bool, action=argparse.BooleanOptionalAction, ) return parser.parse_args() def _run_output_consistency_tests_internal( self, connection: Connection, random_seed: str, dry_run: bool, fail_fast: bool, verbose_output: bool, max_cols_per_query: int, max_runtime_in_sec: int, max_iterations: int, avoid_expressions_expecting_db_error: bool, ) -> ConsistencyTestSummary: input_data = self.create_input_data() output_printer = OutputPrinter(input_data) scenario = self.get_scenario() config = ConsistencyTestConfiguration( random_seed=random_seed, scenario=scenario, dry_run=dry_run, fail_fast=fail_fast, verbose_output=verbose_output, max_cols_per_query=max_cols_per_query, max_runtime_in_sec=max_runtime_in_sec, max_iterations=max_iterations, avoid_expressions_expecting_db_error=avoid_expressions_expecting_db_error, queries_per_tx=20, max_pending_expressions=100, use_autocommit=True, split_and_retry_on_db_error=True, print_reproduction_code=True, postgres_compatible_mode=scenario == EvaluationScenario.POSTGRES_CONSISTENCY, ) output_printer.print_config(config) config.validate() if config.postgres_compatible_mode: input_data.remove_postgres_incompatible_data() randomized_picker = RandomizedPicker(config) sql_executors = self.create_sql_executors(config, connection, output_printer) evaluation_strategies = self.create_evaluation_strategies(sql_executors) ignore_filter = self.create_inconsistency_ignore_filter(sql_executors) expression_generator = ExpressionGenerator( config, randomized_picker, input_data ) query_generator = QueryGenerator( config, randomized_picker, input_data, ignore_filter ) output_comparator = self.create_result_comparator(ignore_filter) output_printer.print_info(sql_executors.get_database_infos()) output_printer.print_empty_line() output_printer.print_info(input_data.get_stats()) output_printer.print_empty_line() if not self.shall_run(sql_executors): output_printer.print_info("Not running the test, criteria are not met.") return ConsistencyTestSummary() test_runner = ConsistencyTestRunner( config, input_data, evaluation_strategies, expression_generator, query_generator, output_comparator, sql_executors, randomized_picker, ignore_filter, output_printer, ) test_runner.setup() output_printer.start_section("Test remarks") if not config.verbose_output: output_printer.print_info( "Printing only queries with inconsistencies or warnings in non-verbose mode." ) output_printer.print_empty_line() test_summary = test_runner.start() output_printer.print_test_summary(test_summary) return test_summary def shall_run(self, sql_executors: SqlExecutors) -> bool: return True def create_input_data(self) -> ConsistencyTestInputData: return ConsistencyTestInputData() def create_sql_executors( self, config: ConsistencyTestConfiguration, connection: Connection, output_printer: OutputPrinter, ) -> SqlExecutors: return SqlExecutors( create_sql_executor(config, connection, output_printer, "mz") ) def get_scenario(self) -> EvaluationScenario: return EvaluationScenario.OUTPUT_CONSISTENCY def create_result_comparator( self, ignore_filter: GenericInconsistencyIgnoreFilter ) -> ResultComparator: return ResultComparator(ignore_filter) def create_inconsistency_ignore_filter( self, sql_executors: SqlExecutors ) -> GenericInconsistencyIgnoreFilter: return InternalOutputInconsistencyIgnoreFilter() def create_evaluation_strategies( self, sql_executors: SqlExecutors ) -> list[EvaluationStrategy]: return [ DataFlowRenderingEvaluation(), ConstantFoldingEvaluation(), ]
Subclasses
Methods
def create_evaluation_strategies(self, sql_executors: SqlExecutors) ‑> list[EvaluationStrategy]
-
Expand source code Browse git
def create_evaluation_strategies( self, sql_executors: SqlExecutors ) -> list[EvaluationStrategy]: return [ DataFlowRenderingEvaluation(), ConstantFoldingEvaluation(), ]
def create_inconsistency_ignore_filter(self, sql_executors: SqlExecutors) ‑> GenericInconsistencyIgnoreFilter
-
Expand source code Browse git
def create_inconsistency_ignore_filter( self, sql_executors: SqlExecutors ) -> GenericInconsistencyIgnoreFilter: return InternalOutputInconsistencyIgnoreFilter()
def create_input_data(self) ‑> ConsistencyTestInputData
-
Expand source code Browse git
def create_input_data(self) -> ConsistencyTestInputData: return ConsistencyTestInputData()
def create_result_comparator(self, ignore_filter: GenericInconsistencyIgnoreFilter) ‑> ResultComparator
-
Expand source code Browse git
def create_result_comparator( self, ignore_filter: GenericInconsistencyIgnoreFilter ) -> ResultComparator: return ResultComparator(ignore_filter)
def create_sql_executors(self, config: ConsistencyTestConfiguration, connection: pg8000.legacy.Connection, output_printer: OutputPrinter) ‑> SqlExecutors
-
Expand source code Browse git
def create_sql_executors( self, config: ConsistencyTestConfiguration, connection: Connection, output_printer: OutputPrinter, ) -> SqlExecutors: return SqlExecutors( create_sql_executor(config, connection, output_printer, "mz") )
def get_scenario(self) ‑> EvaluationScenario
-
Expand source code Browse git
def get_scenario(self) -> EvaluationScenario: return EvaluationScenario.OUTPUT_CONSISTENCY
def parse_output_consistency_input_args(self, parser: argparse.ArgumentParser) ‑> argparse.Namespace
-
Expand source code Browse git
def parse_output_consistency_input_args( self, parser: argparse.ArgumentParser, ) -> argparse.Namespace: parser.add_argument("--seed", default="0", type=str) parser.add_argument( "--dry-run", default=False, type=bool, action=argparse.BooleanOptionalAction ) parser.add_argument( "--fail-fast", default=False, type=bool, action=argparse.BooleanOptionalAction, ) parser.add_argument( "--verbose", default=False, type=bool, action=argparse.BooleanOptionalAction, ) parser.add_argument("--max-cols-per-query", default=20, type=int) parser.add_argument("--max-runtime-in-sec", default=600, type=int) parser.add_argument("--max-iterations", default=100000, type=int) parser.add_argument( "--avoid-expressions-expecting-db-error", default=False, type=bool, action=argparse.BooleanOptionalAction, ) return parser.parse_args()
def run_output_consistency_tests(self, connection: pg8000.legacy.Connection, args: argparse.Namespace) ‑> ConsistencyTestSummary
-
Entry point for output consistency tests
Expand source code Browse git
def run_output_consistency_tests( self, connection: Connection, args: argparse.Namespace, ) -> ConsistencyTestSummary: """Entry point for output consistency tests""" return self._run_output_consistency_tests_internal( connection, args.seed, args.dry_run, args.fail_fast, args.verbose, args.max_cols_per_query, args.max_runtime_in_sec, args.max_iterations, args.avoid_expressions_expecting_db_error, )
def shall_run(self, sql_executors: SqlExecutors) ‑> bool
-
Expand source code Browse git
def shall_run(self, sql_executors: SqlExecutors) -> bool: return True