misc.python.materialize.source_table_migration
Utilities for testing the source table migration
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""Utilities for testing the source table migration""" 11 12from materialize.mzcompose.composition import Composition 13 14 15def verify_sources_after_source_table_migration( 16 c: Composition, 17 file: str, 18 fail: bool = False, 19 service: str | None = None, 20) -> None: 21 source_names_rows = c.sql_query( 22 "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';", 23 service=service, 24 ) 25 source_names = [row[0] for row in source_names_rows] 26 27 print(f"Sources created in {file} are: {source_names}") 28 29 c.sql("SET statement_timeout = '20s'", service=service) 30 31 for source_name in source_names: 32 _verify_source(c, file, source_name, fail=fail, service=service) 33 34 35def _verify_source( 36 c: Composition, 37 file: str, 38 source_name: str, 39 fail: bool = False, 40 service: str | None = None, 41) -> None: 42 try: 43 print(f"Checking source: {source_name}") 44 45 statement = f"SHOW CREATE SOURCE {source_name};" 46 result = c.sql_query(statement, service=service) 47 sql = result[0][1] 48 assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}" 49 assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}" 50 51 if not source_name.endswith("_progress"): 52 assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}" 53 54 print("OK.") 55 except Exception as e: 56 print(f"source-table-migration issue in {file}: {str(e)}") 57 58 if fail: 59 raise e
def
verify_sources_after_source_table_migration( c: materialize.mzcompose.composition.Composition, file: str, fail: bool = False, service: str | None = None) -> None:
16def verify_sources_after_source_table_migration( 17 c: Composition, 18 file: str, 19 fail: bool = False, 20 service: str | None = None, 21) -> None: 22 source_names_rows = c.sql_query( 23 "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';", 24 service=service, 25 ) 26 source_names = [row[0] for row in source_names_rows] 27 28 print(f"Sources created in {file} are: {source_names}") 29 30 c.sql("SET statement_timeout = '20s'", service=service) 31 32 for source_name in source_names: 33 _verify_source(c, file, source_name, fail=fail, service=service)