Module materialize.postgres_consistency.postgres_consistency_test
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import argparse
from pg8000 import Connection
from pg8000.exceptions import InterfaceError
from materialize.output_consistency.common.configuration import (
ConsistencyTestConfiguration,
)
from materialize.output_consistency.execution.evaluation_strategy import (
DataFlowRenderingEvaluation,
EvaluationStrategy,
)
from materialize.output_consistency.execution.sql_executor import create_sql_executor
from materialize.output_consistency.execution.sql_executors import SqlExecutors
from materialize.output_consistency.ignore_filter.inconsistency_ignore_filter import (
GenericInconsistencyIgnoreFilter,
)
from materialize.output_consistency.input_data.scenarios.evaluation_scenario import (
EvaluationScenario,
)
from materialize.output_consistency.input_data.test_input_data import (
ConsistencyTestInputData,
)
from materialize.output_consistency.output.output_printer import OutputPrinter
from materialize.output_consistency.output_consistency_test import (
OutputConsistencyTest,
connect,
)
from materialize.output_consistency.validation.result_comparator import ResultComparator
from materialize.postgres_consistency.custom.predefined_pg_queries import (
create_custom_pg_consistency_queries,
)
from materialize.postgres_consistency.execution.pg_evaluation_strategy import (
PgEvaluation,
)
from materialize.postgres_consistency.execution.pg_sql_executors import PgSqlExecutors
from materialize.postgres_consistency.ignore_filter.pg_inconsistency_ignore_filter import (
PgInconsistencyIgnoreFilter,
)
from materialize.postgres_consistency.validation.pg_result_comparator import (
PostgresResultComparator,
)
class PostgresConsistencyTest(OutputConsistencyTest):
def __init__(self) -> None:
self.pg_connection: Connection | None = None
def get_scenario(self) -> EvaluationScenario:
return EvaluationScenario.POSTGRES_CONSISTENCY
def create_sql_executors(
self,
config: ConsistencyTestConfiguration,
connection: Connection,
output_printer: OutputPrinter,
) -> SqlExecutors:
if self.pg_connection is None:
raise RuntimeError("Postgres connection is not initialized")
return PgSqlExecutors(
create_sql_executor(config, connection, output_printer, "mz"),
create_sql_executor(
config, self.pg_connection, output_printer, "pg", is_mz=False
),
)
def create_result_comparator(
self, ignore_filter: GenericInconsistencyIgnoreFilter
) -> ResultComparator:
return PostgresResultComparator(ignore_filter)
def create_inconsistency_ignore_filter(
self, sql_executors: SqlExecutors
) -> GenericInconsistencyIgnoreFilter:
return PgInconsistencyIgnoreFilter()
def create_evaluation_strategies(
self, sql_executors: SqlExecutors
) -> list[EvaluationStrategy]:
mz_evaluation_strategy = DataFlowRenderingEvaluation()
mz_evaluation_strategy.name = "Materialize evaluation"
mz_evaluation_strategy.simple_db_object_name = "mz_evaluation"
return [
# Materialize
mz_evaluation_strategy,
# Postgres
PgEvaluation(),
]
def create_input_data(self) -> ConsistencyTestInputData:
input_data = super().create_input_data()
input_data.predefined_queries.extend(create_custom_pg_consistency_queries())
return input_data
def main() -> int:
test = PostgresConsistencyTest()
parser = argparse.ArgumentParser(
prog="postgres-consistency-test",
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Test the consistency of Materialize and Postgres",
)
parser.add_argument("--mz-host", default="localhost", type=str)
parser.add_argument("--mz-port", default=6875, type=int)
parser.add_argument("--pg-host", default="localhost", type=str)
parser.add_argument("--pg-port", default=5432, type=int)
parser.add_argument("--pg-password", default=None, type=str)
args = test.parse_output_consistency_input_args(parser)
try:
mz_db_user = "materialize"
mz_connection = connect(args.mz_host, args.mz_port, mz_db_user)
pg_db_user = "postgres"
test.pg_connection = connect(
args.pg_host, args.pg_port, pg_db_user, args.pg_password
)
except InterfaceError:
return 1
result = test.run_output_consistency_tests(mz_connection, args)
return 0 if result.all_passed() else 1
if __name__ == "__main__":
exit(main())
Functions
def main() ‑> int
-
Expand source code Browse git
def main() -> int: test = PostgresConsistencyTest() parser = argparse.ArgumentParser( prog="postgres-consistency-test", formatter_class=argparse.RawDescriptionHelpFormatter, description="Test the consistency of Materialize and Postgres", ) parser.add_argument("--mz-host", default="localhost", type=str) parser.add_argument("--mz-port", default=6875, type=int) parser.add_argument("--pg-host", default="localhost", type=str) parser.add_argument("--pg-port", default=5432, type=int) parser.add_argument("--pg-password", default=None, type=str) args = test.parse_output_consistency_input_args(parser) try: mz_db_user = "materialize" mz_connection = connect(args.mz_host, args.mz_port, mz_db_user) pg_db_user = "postgres" test.pg_connection = connect( args.pg_host, args.pg_port, pg_db_user, args.pg_password ) except InterfaceError: return 1 result = test.run_output_consistency_tests(mz_connection, args) return 0 if result.all_passed() else 1
Classes
class PostgresConsistencyTest
-
Expand source code Browse git
class PostgresConsistencyTest(OutputConsistencyTest): def __init__(self) -> None: self.pg_connection: Connection | None = None def get_scenario(self) -> EvaluationScenario: return EvaluationScenario.POSTGRES_CONSISTENCY def create_sql_executors( self, config: ConsistencyTestConfiguration, connection: Connection, output_printer: OutputPrinter, ) -> SqlExecutors: if self.pg_connection is None: raise RuntimeError("Postgres connection is not initialized") return PgSqlExecutors( create_sql_executor(config, connection, output_printer, "mz"), create_sql_executor( config, self.pg_connection, output_printer, "pg", is_mz=False ), ) def create_result_comparator( self, ignore_filter: GenericInconsistencyIgnoreFilter ) -> ResultComparator: return PostgresResultComparator(ignore_filter) def create_inconsistency_ignore_filter( self, sql_executors: SqlExecutors ) -> GenericInconsistencyIgnoreFilter: return PgInconsistencyIgnoreFilter() def create_evaluation_strategies( self, sql_executors: SqlExecutors ) -> list[EvaluationStrategy]: mz_evaluation_strategy = DataFlowRenderingEvaluation() mz_evaluation_strategy.name = "Materialize evaluation" mz_evaluation_strategy.simple_db_object_name = "mz_evaluation" return [ # Materialize mz_evaluation_strategy, # Postgres PgEvaluation(), ] def create_input_data(self) -> ConsistencyTestInputData: input_data = super().create_input_data() input_data.predefined_queries.extend(create_custom_pg_consistency_queries()) return input_data
Ancestors
Methods
def create_evaluation_strategies(self, sql_executors: SqlExecutors) ‑> list[EvaluationStrategy]
-
Expand source code Browse git
def create_evaluation_strategies( self, sql_executors: SqlExecutors ) -> list[EvaluationStrategy]: mz_evaluation_strategy = DataFlowRenderingEvaluation() mz_evaluation_strategy.name = "Materialize evaluation" mz_evaluation_strategy.simple_db_object_name = "mz_evaluation" return [ # Materialize mz_evaluation_strategy, # Postgres PgEvaluation(), ]
def create_inconsistency_ignore_filter(self, sql_executors: SqlExecutors) ‑> GenericInconsistencyIgnoreFilter
-
Expand source code Browse git
def create_inconsistency_ignore_filter( self, sql_executors: SqlExecutors ) -> GenericInconsistencyIgnoreFilter: return PgInconsistencyIgnoreFilter()
def create_input_data(self) ‑> ConsistencyTestInputData
-
Expand source code Browse git
def create_input_data(self) -> ConsistencyTestInputData: input_data = super().create_input_data() input_data.predefined_queries.extend(create_custom_pg_consistency_queries()) return input_data
def create_result_comparator(self, ignore_filter: GenericInconsistencyIgnoreFilter) ‑> ResultComparator
-
Expand source code Browse git
def create_result_comparator( self, ignore_filter: GenericInconsistencyIgnoreFilter ) -> ResultComparator: return PostgresResultComparator(ignore_filter)
def create_sql_executors(self, config: ConsistencyTestConfiguration, connection: pg8000.legacy.Connection, output_printer: OutputPrinter) ‑> SqlExecutors
-
Expand source code Browse git
def create_sql_executors( self, config: ConsistencyTestConfiguration, connection: Connection, output_printer: OutputPrinter, ) -> SqlExecutors: if self.pg_connection is None: raise RuntimeError("Postgres connection is not initialized") return PgSqlExecutors( create_sql_executor(config, connection, output_printer, "mz"), create_sql_executor( config, self.pg_connection, output_printer, "pg", is_mz=False ), )
def get_scenario(self) ‑> EvaluationScenario
-
Expand source code Browse git
def get_scenario(self) -> EvaluationScenario: return EvaluationScenario.POSTGRES_CONSISTENCY
Inherited members