Module materialize.output_consistency.execution.query_execution_manager
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from datetime import datetime
from materialize.output_consistency.common.configuration import (
ConsistencyTestConfiguration,
)
from materialize.output_consistency.execution.evaluation_strategy import (
EvaluationStrategy,
)
from materialize.output_consistency.execution.sql_executor import SqlExecutionError
from materialize.output_consistency.execution.sql_executors import SqlExecutors
from materialize.output_consistency.execution.test_summary import ConsistencyTestSummary
from materialize.output_consistency.input_data.test_input_data import (
ConsistencyTestInputData,
)
from materialize.output_consistency.output.output_printer import OutputPrinter
from materialize.output_consistency.query.query_format import QueryOutputFormat
from materialize.output_consistency.query.query_result import (
QueryExecution,
QueryFailure,
QueryResult,
)
from materialize.output_consistency.query.query_template import QueryTemplate
from materialize.output_consistency.selection.selection import (
ALL_QUERY_COLUMNS_BY_INDEX_SELECTION,
)
from materialize.output_consistency.validation.result_comparator import ResultComparator
from materialize.output_consistency.validation.validation_outcome import (
ValidationOutcome,
ValidationVerdict,
)
class QueryExecutionManager:
"""Requests the execution of queries and handles transactions"""
def __init__(
self,
evaluation_strategies: list[EvaluationStrategy],
config: ConsistencyTestConfiguration,
executors: SqlExecutors,
comparator: ResultComparator,
output_printer: OutputPrinter,
):
self.evaluation_strategies = evaluation_strategies
self.config = config
self.executors = executors
self.comparator = comparator
self.output_printer = output_printer
self.query_counter = 0
def setup_database_objects(
self,
input_data: ConsistencyTestInputData,
evaluation_strategies: list[EvaluationStrategy],
) -> None:
self.output_printer.start_section("Setup code", collapsed=True)
for strategy in evaluation_strategies:
self.output_printer.print_info(
f"Setup for evaluation strategy '{strategy.name}'"
)
executor = self.executors.get_executor(strategy)
ddl_statements = strategy.generate_sources(input_data.types_input)
for sql_statement in ddl_statements:
self.output_printer.print_sql(sql_statement)
try:
executor.ddl(sql_statement)
except SqlExecutionError as e:
self.output_printer.print_error(
f"Setting up data structures failed ({e.message})!"
)
raise e
def execute_query(
self, query: QueryTemplate, summary_to_update: ConsistencyTestSummary
) -> bool:
if self.query_counter % self.config.queries_per_tx == 0:
# commit after every couple of queries
for strategy in self.evaluation_strategies:
self.begin_tx(strategy, commit_previous_tx=self.query_counter > 0)
query_index = self.query_counter
self.query_counter += 1
test_outcomes = self.fire_and_compare_query(
query, query_index, "", self.evaluation_strategies
)
all_comparisons_passed = True
for test_outcome in test_outcomes:
summary_to_update.count_executed_query_templates += 1
verdict = test_outcome.verdict()
if verdict in {
ValidationVerdict.SUCCESS,
ValidationVerdict.SUCCESS_WITH_WARNINGS,
}:
summary_to_update.count_successful_query_templates += 1
elif verdict == ValidationVerdict.IGNORED_FAILURE:
summary_to_update.count_ignored_error_query_templates += 1
elif verdict == ValidationVerdict.FAILURE:
summary_to_update.add_failures(test_outcome.to_failure_details())
all_comparisons_passed = False
else:
raise RuntimeError(f"Unexpected verdict: {verdict}")
if test_outcome.has_warnings():
summary_to_update.count_with_warning_query_templates += 1
return all_comparisons_passed
def complete(self, strategy: EvaluationStrategy) -> None:
self.commit_tx(strategy)
def begin_tx(self, strategy: EvaluationStrategy, commit_previous_tx: bool) -> None:
if commit_previous_tx:
self.commit_tx(strategy)
self.executors.get_executor(strategy).begin_tx("SERIALIZABLE")
def commit_tx(
self,
strategy: EvaluationStrategy,
) -> None:
if not self.config.use_autocommit:
self.executors.get_executor(strategy).commit()
def rollback_tx(self, strategy: EvaluationStrategy, start_new_tx: bool) -> None:
# do this also when in autocommit mode
self.executors.get_executor(strategy).rollback()
if start_new_tx:
self.begin_tx(strategy, commit_previous_tx=False)
# May return multiple outcomes if a query is split and retried. Will always return at least one outcome.
def fire_and_compare_query(
self,
query_template: QueryTemplate,
query_index: int,
query_id_prefix: str,
evaluation_strategies: list[EvaluationStrategy],
) -> list[ValidationOutcome]:
query_no = query_index + 1
query_id = f"{query_id_prefix}{query_no}"
query_execution = QueryExecution(query_template, query_id)
if self.config.verbose_output:
# print the header with the query before the execution to have information if it gets stuck
self.print_query_header(query_id, query_execution, collapsed=True)
for strategy in evaluation_strategies:
sql_query_string = query_template.to_sql(
strategy,
QueryOutputFormat.SINGLE_LINE,
ALL_QUERY_COLUMNS_BY_INDEX_SELECTION,
)
start_time = datetime.now()
try:
data = self.executors.get_executor(strategy).query(sql_query_string)
duration = self._get_duration_in_ms(start_time)
result = QueryResult(
strategy, sql_query_string, query_template.column_count(), data
)
query_execution.outcomes.append(result)
query_execution.durations.append(duration)
except SqlExecutionError as err:
duration = self._get_duration_in_ms(start_time)
self.rollback_tx(strategy, start_new_tx=True)
if self.shall_retry_with_smaller_query(query_template):
# abort and retry with smaller query
# this will discard the outcomes of all strategies
return self.split_and_retry_queries(
query_template, query_id, evaluation_strategies
)
failure = QueryFailure(
strategy, sql_query_string, query_template.column_count(), str(err)
)
query_execution.outcomes.append(failure)
query_execution.durations.append(duration)
if self.config.dry_run:
return [ValidationOutcome()]
validation_outcome = self.comparator.compare_results(query_execution)
self.print_test_result(query_id, query_execution, validation_outcome)
return [validation_outcome]
def _get_duration_in_ms(self, start_time: datetime) -> float:
end_time = datetime.now()
duration = end_time - start_time
return duration.total_seconds()
def shall_retry_with_smaller_query(self, query_template: QueryTemplate) -> bool:
return (
self.config.split_and_retry_on_db_error
and query_template.column_count() > 1
)
def split_and_retry_queries(
self,
original_query_template: QueryTemplate,
query_id: str,
evaluation_strategies: list[EvaluationStrategy],
) -> list[ValidationOutcome]:
args_count = len(original_query_template.select_expressions)
if args_count < 2:
raise RuntimeError("Cannot split query")
# This code assumes that the query failed because of the SELECT expressions.
# However, it is also possible that the where condition was invalid.
# This is ignored as of now.
arg_split_index = int(args_count / 2)
query1_args = original_query_template.select_expressions[arg_split_index:]
query2_args = original_query_template.select_expressions[:arg_split_index]
new_query_template1 = QueryTemplate(
False,
query1_args,
original_query_template.where_expression,
original_query_template.storage_layout,
original_query_template.contains_aggregations,
original_query_template.row_selection,
original_query_template.offset,
original_query_template.limit,
)
new_query_template2 = QueryTemplate(
False,
query2_args,
original_query_template.where_expression,
original_query_template.storage_layout,
original_query_template.contains_aggregations,
original_query_template.row_selection,
original_query_template.offset,
original_query_template.limit,
)
query_id_prefix = f"{query_id}."
validation_outcomes = []
validation_outcomes.extend(
self.fire_and_compare_query(
new_query_template1, 0, query_id_prefix, evaluation_strategies
)
)
validation_outcomes.extend(
self.fire_and_compare_query(
new_query_template2, 1, query_id_prefix, evaluation_strategies
)
)
return validation_outcomes
def print_query_header(
self,
query_id: str,
query_execution: QueryExecution,
collapsed: bool,
status: str | None = None,
flush: bool = False,
) -> None:
status = "" if status is None else f" ({status})"
self.output_printer.start_section(
f"Test query #{query_id}{status}", collapsed=collapsed
)
self.output_printer.print_sql(query_execution.generic_sql)
if flush:
self.output_printer.flush()
def print_test_result(
self,
query_id: str,
query_execution: QueryExecution,
validation_outcome: ValidationOutcome,
) -> None:
if (
validation_outcome.verdict() == ValidationVerdict.SUCCESS
and not self.config.verbose_output
):
return
status = validation_outcome.verdict().name
if not self.config.verbose_output:
# In verbose mode, the header has already been printed
self.print_query_header(
query_id,
query_execution,
collapsed=validation_outcome.verdict().accepted(),
status=status,
flush=True,
)
result_desc = "PASSED" if validation_outcome.verdict().accepted() else "FAILED"
success_reason = (
f" ({validation_outcome.success_reason})"
if validation_outcome.success_reason is not None
and validation_outcome.verdict().succeeded()
else ""
)
self.output_printer.print_info(
f"Test with query #{query_id} {result_desc}{success_reason}."
)
duration_info = ", ".join(
f"{duration:.3f}" for duration in query_execution.durations
)
self.output_printer.print_info(f"Durations: {duration_info}")
if validation_outcome.has_errors():
self.output_printer.print_info(
f"Errors:\n{validation_outcome.error_output()}"
)
if self.config.print_reproduction_code:
self.output_printer.print_reproduction_code(validation_outcome.errors)
if validation_outcome.has_warnings():
self.output_printer.print_info(
f"Warnings:\n{validation_outcome.warning_output()}"
)
if validation_outcome.has_remarks():
self.output_printer.print_info(
f"Remarks:\n{validation_outcome.remark_output()}"
)
Classes
class QueryExecutionManager (evaluation_strategies: list[EvaluationStrategy], config: ConsistencyTestConfiguration, executors: SqlExecutors, comparator: ResultComparator, output_printer: OutputPrinter)
-
Requests the execution of queries and handles transactions
Expand source code Browse git
class QueryExecutionManager: """Requests the execution of queries and handles transactions""" def __init__( self, evaluation_strategies: list[EvaluationStrategy], config: ConsistencyTestConfiguration, executors: SqlExecutors, comparator: ResultComparator, output_printer: OutputPrinter, ): self.evaluation_strategies = evaluation_strategies self.config = config self.executors = executors self.comparator = comparator self.output_printer = output_printer self.query_counter = 0 def setup_database_objects( self, input_data: ConsistencyTestInputData, evaluation_strategies: list[EvaluationStrategy], ) -> None: self.output_printer.start_section("Setup code", collapsed=True) for strategy in evaluation_strategies: self.output_printer.print_info( f"Setup for evaluation strategy '{strategy.name}'" ) executor = self.executors.get_executor(strategy) ddl_statements = strategy.generate_sources(input_data.types_input) for sql_statement in ddl_statements: self.output_printer.print_sql(sql_statement) try: executor.ddl(sql_statement) except SqlExecutionError as e: self.output_printer.print_error( f"Setting up data structures failed ({e.message})!" ) raise e def execute_query( self, query: QueryTemplate, summary_to_update: ConsistencyTestSummary ) -> bool: if self.query_counter % self.config.queries_per_tx == 0: # commit after every couple of queries for strategy in self.evaluation_strategies: self.begin_tx(strategy, commit_previous_tx=self.query_counter > 0) query_index = self.query_counter self.query_counter += 1 test_outcomes = self.fire_and_compare_query( query, query_index, "", self.evaluation_strategies ) all_comparisons_passed = True for test_outcome in test_outcomes: summary_to_update.count_executed_query_templates += 1 verdict = test_outcome.verdict() if verdict in { ValidationVerdict.SUCCESS, ValidationVerdict.SUCCESS_WITH_WARNINGS, }: summary_to_update.count_successful_query_templates += 1 elif verdict == ValidationVerdict.IGNORED_FAILURE: summary_to_update.count_ignored_error_query_templates += 1 elif verdict == ValidationVerdict.FAILURE: summary_to_update.add_failures(test_outcome.to_failure_details()) all_comparisons_passed = False else: raise RuntimeError(f"Unexpected verdict: {verdict}") if test_outcome.has_warnings(): summary_to_update.count_with_warning_query_templates += 1 return all_comparisons_passed def complete(self, strategy: EvaluationStrategy) -> None: self.commit_tx(strategy) def begin_tx(self, strategy: EvaluationStrategy, commit_previous_tx: bool) -> None: if commit_previous_tx: self.commit_tx(strategy) self.executors.get_executor(strategy).begin_tx("SERIALIZABLE") def commit_tx( self, strategy: EvaluationStrategy, ) -> None: if not self.config.use_autocommit: self.executors.get_executor(strategy).commit() def rollback_tx(self, strategy: EvaluationStrategy, start_new_tx: bool) -> None: # do this also when in autocommit mode self.executors.get_executor(strategy).rollback() if start_new_tx: self.begin_tx(strategy, commit_previous_tx=False) # May return multiple outcomes if a query is split and retried. Will always return at least one outcome. def fire_and_compare_query( self, query_template: QueryTemplate, query_index: int, query_id_prefix: str, evaluation_strategies: list[EvaluationStrategy], ) -> list[ValidationOutcome]: query_no = query_index + 1 query_id = f"{query_id_prefix}{query_no}" query_execution = QueryExecution(query_template, query_id) if self.config.verbose_output: # print the header with the query before the execution to have information if it gets stuck self.print_query_header(query_id, query_execution, collapsed=True) for strategy in evaluation_strategies: sql_query_string = query_template.to_sql( strategy, QueryOutputFormat.SINGLE_LINE, ALL_QUERY_COLUMNS_BY_INDEX_SELECTION, ) start_time = datetime.now() try: data = self.executors.get_executor(strategy).query(sql_query_string) duration = self._get_duration_in_ms(start_time) result = QueryResult( strategy, sql_query_string, query_template.column_count(), data ) query_execution.outcomes.append(result) query_execution.durations.append(duration) except SqlExecutionError as err: duration = self._get_duration_in_ms(start_time) self.rollback_tx(strategy, start_new_tx=True) if self.shall_retry_with_smaller_query(query_template): # abort and retry with smaller query # this will discard the outcomes of all strategies return self.split_and_retry_queries( query_template, query_id, evaluation_strategies ) failure = QueryFailure( strategy, sql_query_string, query_template.column_count(), str(err) ) query_execution.outcomes.append(failure) query_execution.durations.append(duration) if self.config.dry_run: return [ValidationOutcome()] validation_outcome = self.comparator.compare_results(query_execution) self.print_test_result(query_id, query_execution, validation_outcome) return [validation_outcome] def _get_duration_in_ms(self, start_time: datetime) -> float: end_time = datetime.now() duration = end_time - start_time return duration.total_seconds() def shall_retry_with_smaller_query(self, query_template: QueryTemplate) -> bool: return ( self.config.split_and_retry_on_db_error and query_template.column_count() > 1 ) def split_and_retry_queries( self, original_query_template: QueryTemplate, query_id: str, evaluation_strategies: list[EvaluationStrategy], ) -> list[ValidationOutcome]: args_count = len(original_query_template.select_expressions) if args_count < 2: raise RuntimeError("Cannot split query") # This code assumes that the query failed because of the SELECT expressions. # However, it is also possible that the where condition was invalid. # This is ignored as of now. arg_split_index = int(args_count / 2) query1_args = original_query_template.select_expressions[arg_split_index:] query2_args = original_query_template.select_expressions[:arg_split_index] new_query_template1 = QueryTemplate( False, query1_args, original_query_template.where_expression, original_query_template.storage_layout, original_query_template.contains_aggregations, original_query_template.row_selection, original_query_template.offset, original_query_template.limit, ) new_query_template2 = QueryTemplate( False, query2_args, original_query_template.where_expression, original_query_template.storage_layout, original_query_template.contains_aggregations, original_query_template.row_selection, original_query_template.offset, original_query_template.limit, ) query_id_prefix = f"{query_id}." validation_outcomes = [] validation_outcomes.extend( self.fire_and_compare_query( new_query_template1, 0, query_id_prefix, evaluation_strategies ) ) validation_outcomes.extend( self.fire_and_compare_query( new_query_template2, 1, query_id_prefix, evaluation_strategies ) ) return validation_outcomes def print_query_header( self, query_id: str, query_execution: QueryExecution, collapsed: bool, status: str | None = None, flush: bool = False, ) -> None: status = "" if status is None else f" ({status})" self.output_printer.start_section( f"Test query #{query_id}{status}", collapsed=collapsed ) self.output_printer.print_sql(query_execution.generic_sql) if flush: self.output_printer.flush() def print_test_result( self, query_id: str, query_execution: QueryExecution, validation_outcome: ValidationOutcome, ) -> None: if ( validation_outcome.verdict() == ValidationVerdict.SUCCESS and not self.config.verbose_output ): return status = validation_outcome.verdict().name if not self.config.verbose_output: # In verbose mode, the header has already been printed self.print_query_header( query_id, query_execution, collapsed=validation_outcome.verdict().accepted(), status=status, flush=True, ) result_desc = "PASSED" if validation_outcome.verdict().accepted() else "FAILED" success_reason = ( f" ({validation_outcome.success_reason})" if validation_outcome.success_reason is not None and validation_outcome.verdict().succeeded() else "" ) self.output_printer.print_info( f"Test with query #{query_id} {result_desc}{success_reason}." ) duration_info = ", ".join( f"{duration:.3f}" for duration in query_execution.durations ) self.output_printer.print_info(f"Durations: {duration_info}") if validation_outcome.has_errors(): self.output_printer.print_info( f"Errors:\n{validation_outcome.error_output()}" ) if self.config.print_reproduction_code: self.output_printer.print_reproduction_code(validation_outcome.errors) if validation_outcome.has_warnings(): self.output_printer.print_info( f"Warnings:\n{validation_outcome.warning_output()}" ) if validation_outcome.has_remarks(): self.output_printer.print_info( f"Remarks:\n{validation_outcome.remark_output()}" )
Methods
def begin_tx(self, strategy: EvaluationStrategy, commit_previous_tx: bool) ‑> None
-
Expand source code Browse git
def begin_tx(self, strategy: EvaluationStrategy, commit_previous_tx: bool) -> None: if commit_previous_tx: self.commit_tx(strategy) self.executors.get_executor(strategy).begin_tx("SERIALIZABLE")
def commit_tx(self, strategy: EvaluationStrategy) ‑> None
-
Expand source code Browse git
def commit_tx( self, strategy: EvaluationStrategy, ) -> None: if not self.config.use_autocommit: self.executors.get_executor(strategy).commit()
def complete(self, strategy: EvaluationStrategy) ‑> None
-
Expand source code Browse git
def complete(self, strategy: EvaluationStrategy) -> None: self.commit_tx(strategy)
def execute_query(self, query: QueryTemplate, summary_to_update: ConsistencyTestSummary) ‑> bool
-
Expand source code Browse git
def execute_query( self, query: QueryTemplate, summary_to_update: ConsistencyTestSummary ) -> bool: if self.query_counter % self.config.queries_per_tx == 0: # commit after every couple of queries for strategy in self.evaluation_strategies: self.begin_tx(strategy, commit_previous_tx=self.query_counter > 0) query_index = self.query_counter self.query_counter += 1 test_outcomes = self.fire_and_compare_query( query, query_index, "", self.evaluation_strategies ) all_comparisons_passed = True for test_outcome in test_outcomes: summary_to_update.count_executed_query_templates += 1 verdict = test_outcome.verdict() if verdict in { ValidationVerdict.SUCCESS, ValidationVerdict.SUCCESS_WITH_WARNINGS, }: summary_to_update.count_successful_query_templates += 1 elif verdict == ValidationVerdict.IGNORED_FAILURE: summary_to_update.count_ignored_error_query_templates += 1 elif verdict == ValidationVerdict.FAILURE: summary_to_update.add_failures(test_outcome.to_failure_details()) all_comparisons_passed = False else: raise RuntimeError(f"Unexpected verdict: {verdict}") if test_outcome.has_warnings(): summary_to_update.count_with_warning_query_templates += 1 return all_comparisons_passed
def fire_and_compare_query(self, query_template: QueryTemplate, query_index: int, query_id_prefix: str, evaluation_strategies: list[EvaluationStrategy]) ‑> list[ValidationOutcome]
-
Expand source code Browse git
def fire_and_compare_query( self, query_template: QueryTemplate, query_index: int, query_id_prefix: str, evaluation_strategies: list[EvaluationStrategy], ) -> list[ValidationOutcome]: query_no = query_index + 1 query_id = f"{query_id_prefix}{query_no}" query_execution = QueryExecution(query_template, query_id) if self.config.verbose_output: # print the header with the query before the execution to have information if it gets stuck self.print_query_header(query_id, query_execution, collapsed=True) for strategy in evaluation_strategies: sql_query_string = query_template.to_sql( strategy, QueryOutputFormat.SINGLE_LINE, ALL_QUERY_COLUMNS_BY_INDEX_SELECTION, ) start_time = datetime.now() try: data = self.executors.get_executor(strategy).query(sql_query_string) duration = self._get_duration_in_ms(start_time) result = QueryResult( strategy, sql_query_string, query_template.column_count(), data ) query_execution.outcomes.append(result) query_execution.durations.append(duration) except SqlExecutionError as err: duration = self._get_duration_in_ms(start_time) self.rollback_tx(strategy, start_new_tx=True) if self.shall_retry_with_smaller_query(query_template): # abort and retry with smaller query # this will discard the outcomes of all strategies return self.split_and_retry_queries( query_template, query_id, evaluation_strategies ) failure = QueryFailure( strategy, sql_query_string, query_template.column_count(), str(err) ) query_execution.outcomes.append(failure) query_execution.durations.append(duration) if self.config.dry_run: return [ValidationOutcome()] validation_outcome = self.comparator.compare_results(query_execution) self.print_test_result(query_id, query_execution, validation_outcome) return [validation_outcome]
def print_query_header(self, query_id: str, query_execution: QueryExecution, collapsed: bool, status: str | None = None, flush: bool = False) ‑> None
-
Expand source code Browse git
def print_query_header( self, query_id: str, query_execution: QueryExecution, collapsed: bool, status: str | None = None, flush: bool = False, ) -> None: status = "" if status is None else f" ({status})" self.output_printer.start_section( f"Test query #{query_id}{status}", collapsed=collapsed ) self.output_printer.print_sql(query_execution.generic_sql) if flush: self.output_printer.flush()
def print_test_result(self, query_id: str, query_execution: QueryExecution, validation_outcome: ValidationOutcome) ‑> None
-
Expand source code Browse git
def print_test_result( self, query_id: str, query_execution: QueryExecution, validation_outcome: ValidationOutcome, ) -> None: if ( validation_outcome.verdict() == ValidationVerdict.SUCCESS and not self.config.verbose_output ): return status = validation_outcome.verdict().name if not self.config.verbose_output: # In verbose mode, the header has already been printed self.print_query_header( query_id, query_execution, collapsed=validation_outcome.verdict().accepted(), status=status, flush=True, ) result_desc = "PASSED" if validation_outcome.verdict().accepted() else "FAILED" success_reason = ( f" ({validation_outcome.success_reason})" if validation_outcome.success_reason is not None and validation_outcome.verdict().succeeded() else "" ) self.output_printer.print_info( f"Test with query #{query_id} {result_desc}{success_reason}." ) duration_info = ", ".join( f"{duration:.3f}" for duration in query_execution.durations ) self.output_printer.print_info(f"Durations: {duration_info}") if validation_outcome.has_errors(): self.output_printer.print_info( f"Errors:\n{validation_outcome.error_output()}" ) if self.config.print_reproduction_code: self.output_printer.print_reproduction_code(validation_outcome.errors) if validation_outcome.has_warnings(): self.output_printer.print_info( f"Warnings:\n{validation_outcome.warning_output()}" ) if validation_outcome.has_remarks(): self.output_printer.print_info( f"Remarks:\n{validation_outcome.remark_output()}" )
def rollback_tx(self, strategy: EvaluationStrategy, start_new_tx: bool) ‑> None
-
Expand source code Browse git
def rollback_tx(self, strategy: EvaluationStrategy, start_new_tx: bool) -> None: # do this also when in autocommit mode self.executors.get_executor(strategy).rollback() if start_new_tx: self.begin_tx(strategy, commit_previous_tx=False)
def setup_database_objects(self, input_data: ConsistencyTestInputData, evaluation_strategies: list[EvaluationStrategy]) ‑> None
-
Expand source code Browse git
def setup_database_objects( self, input_data: ConsistencyTestInputData, evaluation_strategies: list[EvaluationStrategy], ) -> None: self.output_printer.start_section("Setup code", collapsed=True) for strategy in evaluation_strategies: self.output_printer.print_info( f"Setup for evaluation strategy '{strategy.name}'" ) executor = self.executors.get_executor(strategy) ddl_statements = strategy.generate_sources(input_data.types_input) for sql_statement in ddl_statements: self.output_printer.print_sql(sql_statement) try: executor.ddl(sql_statement) except SqlExecutionError as e: self.output_printer.print_error( f"Setting up data structures failed ({e.message})!" ) raise e
def shall_retry_with_smaller_query(self, query_template: QueryTemplate) ‑> bool
-
Expand source code Browse git
def shall_retry_with_smaller_query(self, query_template: QueryTemplate) -> bool: return ( self.config.split_and_retry_on_db_error and query_template.column_count() > 1 )
def split_and_retry_queries(self, original_query_template: QueryTemplate, query_id: str, evaluation_strategies: list[EvaluationStrategy]) ‑> list[ValidationOutcome]
-
Expand source code Browse git
def split_and_retry_queries( self, original_query_template: QueryTemplate, query_id: str, evaluation_strategies: list[EvaluationStrategy], ) -> list[ValidationOutcome]: args_count = len(original_query_template.select_expressions) if args_count < 2: raise RuntimeError("Cannot split query") # This code assumes that the query failed because of the SELECT expressions. # However, it is also possible that the where condition was invalid. # This is ignored as of now. arg_split_index = int(args_count / 2) query1_args = original_query_template.select_expressions[arg_split_index:] query2_args = original_query_template.select_expressions[:arg_split_index] new_query_template1 = QueryTemplate( False, query1_args, original_query_template.where_expression, original_query_template.storage_layout, original_query_template.contains_aggregations, original_query_template.row_selection, original_query_template.offset, original_query_template.limit, ) new_query_template2 = QueryTemplate( False, query2_args, original_query_template.where_expression, original_query_template.storage_layout, original_query_template.contains_aggregations, original_query_template.row_selection, original_query_template.offset, original_query_template.limit, ) query_id_prefix = f"{query_id}." validation_outcomes = [] validation_outcomes.extend( self.fire_and_compare_query( new_query_template1, 0, query_id_prefix, evaluation_strategies ) ) validation_outcomes.extend( self.fire_and_compare_query( new_query_template2, 1, query_id_prefix, evaluation_strategies ) ) return validation_outcomes