misc.python.materialize.source_table_migration
Utilities for testing the source table migration
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""Utilities for testing the source table migration""" 11from materialize.mz_version import MzVersion 12from materialize.mzcompose.composition import Composition 13from materialize.version_list import get_published_minor_mz_versions 14 15 16def verify_sources_after_source_table_migration( 17 c: Composition, file: str, fail: bool = False 18) -> None: 19 source_names_rows = c.sql_query( 20 "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';" 21 ) 22 source_names = [row[0] for row in source_names_rows] 23 24 print(f"Sources created in {file} are: {source_names}") 25 26 c.sql("SET statement_timeout = '20s'") 27 28 for source_name in source_names: 29 _verify_source(c, file, source_name, fail=fail) 30 31 32def _verify_source( 33 c: Composition, file: str, source_name: str, fail: bool = False 34) -> None: 35 try: 36 print(f"Checking source: {source_name}") 37 38 # must not crash 39 statement = f"SELECT count(*) FROM {source_name};" 40 c.sql_query(statement) 41 42 statement = f"SHOW CREATE SOURCE {source_name};" 43 result = c.sql_query(statement) 44 sql = result[0][1] 45 assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}" 46 assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}" 47 48 if not source_name.endswith("_progress"): 49 assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}" 50 51 print("OK.") 52 except Exception as e: 53 print(f"source-table-migration issue in {file}: {str(e)}") 54 55 if fail: 56 raise e 57 58 59_last_version: MzVersion | None = None 60 61 62def get_old_image_for_source_table_migration_test() -> str: 63 global _last_version 64 if _last_version is None: 65 current_version = MzVersion.parse_cargo() 66 minor_versions = [ 67 v 68 for v in get_published_minor_mz_versions( 69 limit=4, exclude_current_minor_version=True 70 ) 71 if v < current_version 72 ] 73 _last_version = minor_versions[0] 74 return f"materialize/materialized:{_last_version}" 75 76 77def get_new_image_for_source_table_migration_test() -> str | None: 78 return None
def
verify_sources_after_source_table_migration( c: materialize.mzcompose.composition.Composition, file: str, fail: bool = False) -> None:
17def verify_sources_after_source_table_migration( 18 c: Composition, file: str, fail: bool = False 19) -> None: 20 source_names_rows = c.sql_query( 21 "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';" 22 ) 23 source_names = [row[0] for row in source_names_rows] 24 25 print(f"Sources created in {file} are: {source_names}") 26 27 c.sql("SET statement_timeout = '20s'") 28 29 for source_name in source_names: 30 _verify_source(c, file, source_name, fail=fail)
def
get_old_image_for_source_table_migration_test() -> str:
63def get_old_image_for_source_table_migration_test() -> str: 64 global _last_version 65 if _last_version is None: 66 current_version = MzVersion.parse_cargo() 67 minor_versions = [ 68 v 69 for v in get_published_minor_mz_versions( 70 limit=4, exclude_current_minor_version=True 71 ) 72 if v < current_version 73 ] 74 _last_version = minor_versions[0] 75 return f"materialize/materialized:{_last_version}"
def
get_new_image_for_source_table_migration_test() -> str | None: