misc.python.materialize.source_table_migration
Utilities for testing the source table migration
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""Utilities for testing the source table migration""" 11from materialize.mzcompose.composition import Composition 12 13 14def verify_sources_after_source_table_migration( 15 c: Composition, 16 file: str, 17 fail: bool = False, 18 service: str | None = None, 19) -> None: 20 source_names_rows = c.sql_query( 21 "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';", 22 service=service, 23 ) 24 source_names = [row[0] for row in source_names_rows] 25 26 print(f"Sources created in {file} are: {source_names}") 27 28 c.sql("SET statement_timeout = '20s'", service=service) 29 30 for source_name in source_names: 31 _verify_source(c, file, source_name, fail=fail, service=service) 32 33 34def _verify_source( 35 c: Composition, 36 file: str, 37 source_name: str, 38 fail: bool = False, 39 service: str | None = None, 40) -> None: 41 try: 42 print(f"Checking source: {source_name}") 43 44 statement = f"SHOW CREATE SOURCE {source_name};" 45 result = c.sql_query(statement, service=service) 46 sql = result[0][1] 47 assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}" 48 assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}" 49 50 if not source_name.endswith("_progress"): 51 assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}" 52 53 print("OK.") 54 except Exception as e: 55 print(f"source-table-migration issue in {file}: {str(e)}") 56 57 if fail: 58 raise e
def
verify_sources_after_source_table_migration( c: materialize.mzcompose.composition.Composition, file: str, fail: bool = False, service: str | None = None) -> None:
15def verify_sources_after_source_table_migration( 16 c: Composition, 17 file: str, 18 fail: bool = False, 19 service: str | None = None, 20) -> None: 21 source_names_rows = c.sql_query( 22 "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';", 23 service=service, 24 ) 25 source_names = [row[0] for row in source_names_rows] 26 27 print(f"Sources created in {file} are: {source_names}") 28 29 c.sql("SET statement_timeout = '20s'", service=service) 30 31 for source_name in source_names: 32 _verify_source(c, file, source_name, fail=fail, service=service)