misc.python.materialize.parallel_workload.worker
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10import random 11import threading 12import time 13from collections import Counter, defaultdict 14 15import psycopg 16import websocket 17 18from materialize.data_ingest.query_error import QueryError 19from materialize.mzcompose.composition import Composition 20from materialize.parallel_workload.action import ( 21 Action, 22 ActionList, 23 ReconnectAction, 24 ws_connect, 25) 26from materialize.parallel_workload.database import Database 27from materialize.parallel_workload.executor import Executor 28 29 30class Worker: 31 rng: random.Random 32 action_list: ActionList | None 33 actions: list[Action] 34 weights: list[float] 35 end_time: float 36 num_queries: Counter[type[Action]] 37 autocommit: bool 38 system: bool 39 exe: Executor | None 40 ignored_errors: defaultdict[str, Counter[type[Action]]] 41 composition: Composition | None 42 occurred_exception: Exception | None 43 44 def __init__( 45 self, 46 rng: random.Random, 47 actions: list[Action], 48 weights: list[float], 49 end_time: float, 50 autocommit: bool, 51 system: bool, 52 composition: Composition | None, 53 action_list: ActionList | None = None, 54 ): 55 self.rng = rng 56 self.action_list = action_list 57 self.actions = actions 58 self.weights = weights 59 self.end_time = end_time 60 self.num_queries = Counter() 61 self.autocommit = autocommit 62 self.system = system 63 self.ignored_errors = defaultdict(Counter) 64 self.composition = composition 65 self.occurred_exception = None 66 self.exe = None 67 68 def run( 69 self, host: str, pg_port: int, http_port: int, user: str, database: Database 70 ) -> None: 71 self.conn = psycopg.connect( 72 host=host, port=pg_port, user=user, dbname="materialize" 73 ) 74 self.conn.autocommit = self.autocommit 75 cur = self.conn.cursor() 76 ws = websocket.WebSocket() 77 ws_conn_id, ws_secret_key = ws_connect(ws, host, http_port, user) 78 self.exe = Executor(self.rng, cur, ws, database) 79 self.exe.set_isolation("SERIALIZABLE") 80 cur.execute("SET auto_route_catalog_queries TO false") 81 if self.exe.use_ws: 82 self.exe.pg_pid = ws_conn_id 83 else: 84 cur.execute("SELECT pg_backend_pid()") 85 self.exe.pg_pid = cur.fetchall()[0][0] 86 87 while time.time() < self.end_time: 88 action = self.rng.choices(self.actions, self.weights)[0] 89 try: 90 if self.exe.rollback_next: 91 try: 92 self.exe.rollback() 93 except QueryError as e: 94 if ( 95 "Please disconnect and re-connect" in e.msg 96 or "server closed the connection unexpectedly" in e.msg 97 or "Can't create a connection to host" in e.msg 98 or "Connection refused" in e.msg 99 or "the connection is lost" in e.msg 100 or "connection in transaction status INERROR" in e.msg 101 ): 102 self.exe.reconnect_next = True 103 self.exe.rollback_next = False 104 continue 105 self.exe.rollback_next = False 106 if self.exe.reconnect_next: 107 ReconnectAction(self.rng, self.composition, random_role=False).run( 108 self.exe 109 ) 110 self.exe.reconnect_next = False 111 if action.run(self.exe): 112 self.num_queries[type(action)] += 1 113 except QueryError as e: 114 self.num_queries[type(action)] += 1 115 # TODO(def-): Reduce number of errors for temp tables/views? At 116 # least the errors will be fast, so maybe not worth it 117 # if "temp" in e.msg: 118 # print(e.query) 119 # print(e.msg) 120 for error_to_ignore in action.errors_to_ignore(self.exe): 121 if error_to_ignore in e.msg: 122 self.ignored_errors[error_to_ignore][type(action)] += 1 123 if ( 124 "Please disconnect and re-connect" in e.msg 125 or "server closed the connection unexpectedly" in e.msg 126 or "Can't create a connection to host" in e.msg 127 or "Connection refused" in e.msg 128 or "the connection is lost" in e.msg 129 or "connection in transaction status INERROR" in e.msg 130 ): 131 self.exe.reconnect_next = True 132 else: 133 self.exe.rollback_next = True 134 break 135 else: 136 thread_name = threading.current_thread().getName() 137 self.occurred_exception = e 138 print(f"+++ [{thread_name}] Query failed: {e.query} {e.msg}") 139 raise 140 except Exception as e: 141 self.occurred_exception = e 142 raise e 143 144 self.exe.cur.connection.close() 145 if self.exe.ws: 146 self.exe.ws.close()
class
Worker:
31class Worker: 32 rng: random.Random 33 action_list: ActionList | None 34 actions: list[Action] 35 weights: list[float] 36 end_time: float 37 num_queries: Counter[type[Action]] 38 autocommit: bool 39 system: bool 40 exe: Executor | None 41 ignored_errors: defaultdict[str, Counter[type[Action]]] 42 composition: Composition | None 43 occurred_exception: Exception | None 44 45 def __init__( 46 self, 47 rng: random.Random, 48 actions: list[Action], 49 weights: list[float], 50 end_time: float, 51 autocommit: bool, 52 system: bool, 53 composition: Composition | None, 54 action_list: ActionList | None = None, 55 ): 56 self.rng = rng 57 self.action_list = action_list 58 self.actions = actions 59 self.weights = weights 60 self.end_time = end_time 61 self.num_queries = Counter() 62 self.autocommit = autocommit 63 self.system = system 64 self.ignored_errors = defaultdict(Counter) 65 self.composition = composition 66 self.occurred_exception = None 67 self.exe = None 68 69 def run( 70 self, host: str, pg_port: int, http_port: int, user: str, database: Database 71 ) -> None: 72 self.conn = psycopg.connect( 73 host=host, port=pg_port, user=user, dbname="materialize" 74 ) 75 self.conn.autocommit = self.autocommit 76 cur = self.conn.cursor() 77 ws = websocket.WebSocket() 78 ws_conn_id, ws_secret_key = ws_connect(ws, host, http_port, user) 79 self.exe = Executor(self.rng, cur, ws, database) 80 self.exe.set_isolation("SERIALIZABLE") 81 cur.execute("SET auto_route_catalog_queries TO false") 82 if self.exe.use_ws: 83 self.exe.pg_pid = ws_conn_id 84 else: 85 cur.execute("SELECT pg_backend_pid()") 86 self.exe.pg_pid = cur.fetchall()[0][0] 87 88 while time.time() < self.end_time: 89 action = self.rng.choices(self.actions, self.weights)[0] 90 try: 91 if self.exe.rollback_next: 92 try: 93 self.exe.rollback() 94 except QueryError as e: 95 if ( 96 "Please disconnect and re-connect" in e.msg 97 or "server closed the connection unexpectedly" in e.msg 98 or "Can't create a connection to host" in e.msg 99 or "Connection refused" in e.msg 100 or "the connection is lost" in e.msg 101 or "connection in transaction status INERROR" in e.msg 102 ): 103 self.exe.reconnect_next = True 104 self.exe.rollback_next = False 105 continue 106 self.exe.rollback_next = False 107 if self.exe.reconnect_next: 108 ReconnectAction(self.rng, self.composition, random_role=False).run( 109 self.exe 110 ) 111 self.exe.reconnect_next = False 112 if action.run(self.exe): 113 self.num_queries[type(action)] += 1 114 except QueryError as e: 115 self.num_queries[type(action)] += 1 116 # TODO(def-): Reduce number of errors for temp tables/views? At 117 # least the errors will be fast, so maybe not worth it 118 # if "temp" in e.msg: 119 # print(e.query) 120 # print(e.msg) 121 for error_to_ignore in action.errors_to_ignore(self.exe): 122 if error_to_ignore in e.msg: 123 self.ignored_errors[error_to_ignore][type(action)] += 1 124 if ( 125 "Please disconnect and re-connect" in e.msg 126 or "server closed the connection unexpectedly" in e.msg 127 or "Can't create a connection to host" in e.msg 128 or "Connection refused" in e.msg 129 or "the connection is lost" in e.msg 130 or "connection in transaction status INERROR" in e.msg 131 ): 132 self.exe.reconnect_next = True 133 else: 134 self.exe.rollback_next = True 135 break 136 else: 137 thread_name = threading.current_thread().getName() 138 self.occurred_exception = e 139 print(f"+++ [{thread_name}] Query failed: {e.query} {e.msg}") 140 raise 141 except Exception as e: 142 self.occurred_exception = e 143 raise e 144 145 self.exe.cur.connection.close() 146 if self.exe.ws: 147 self.exe.ws.close()
Worker( rng: random.Random, actions: list[materialize.parallel_workload.action.Action], weights: list[float], end_time: float, autocommit: bool, system: bool, composition: materialize.mzcompose.composition.Composition | None, action_list: materialize.parallel_workload.action.ActionList | None = None)
45 def __init__( 46 self, 47 rng: random.Random, 48 actions: list[Action], 49 weights: list[float], 50 end_time: float, 51 autocommit: bool, 52 system: bool, 53 composition: Composition | None, 54 action_list: ActionList | None = None, 55 ): 56 self.rng = rng 57 self.action_list = action_list 58 self.actions = actions 59 self.weights = weights 60 self.end_time = end_time 61 self.num_queries = Counter() 62 self.autocommit = autocommit 63 self.system = system 64 self.ignored_errors = defaultdict(Counter) 65 self.composition = composition 66 self.occurred_exception = None 67 self.exe = None
ignored_errors: collections.defaultdict[str, collections.Counter[type[materialize.parallel_workload.action.Action]]]
def
run( self, host: str, pg_port: int, http_port: int, user: str, database: materialize.parallel_workload.database.Database) -> None:
69 def run( 70 self, host: str, pg_port: int, http_port: int, user: str, database: Database 71 ) -> None: 72 self.conn = psycopg.connect( 73 host=host, port=pg_port, user=user, dbname="materialize" 74 ) 75 self.conn.autocommit = self.autocommit 76 cur = self.conn.cursor() 77 ws = websocket.WebSocket() 78 ws_conn_id, ws_secret_key = ws_connect(ws, host, http_port, user) 79 self.exe = Executor(self.rng, cur, ws, database) 80 self.exe.set_isolation("SERIALIZABLE") 81 cur.execute("SET auto_route_catalog_queries TO false") 82 if self.exe.use_ws: 83 self.exe.pg_pid = ws_conn_id 84 else: 85 cur.execute("SELECT pg_backend_pid()") 86 self.exe.pg_pid = cur.fetchall()[0][0] 87 88 while time.time() < self.end_time: 89 action = self.rng.choices(self.actions, self.weights)[0] 90 try: 91 if self.exe.rollback_next: 92 try: 93 self.exe.rollback() 94 except QueryError as e: 95 if ( 96 "Please disconnect and re-connect" in e.msg 97 or "server closed the connection unexpectedly" in e.msg 98 or "Can't create a connection to host" in e.msg 99 or "Connection refused" in e.msg 100 or "the connection is lost" in e.msg 101 or "connection in transaction status INERROR" in e.msg 102 ): 103 self.exe.reconnect_next = True 104 self.exe.rollback_next = False 105 continue 106 self.exe.rollback_next = False 107 if self.exe.reconnect_next: 108 ReconnectAction(self.rng, self.composition, random_role=False).run( 109 self.exe 110 ) 111 self.exe.reconnect_next = False 112 if action.run(self.exe): 113 self.num_queries[type(action)] += 1 114 except QueryError as e: 115 self.num_queries[type(action)] += 1 116 # TODO(def-): Reduce number of errors for temp tables/views? At 117 # least the errors will be fast, so maybe not worth it 118 # if "temp" in e.msg: 119 # print(e.query) 120 # print(e.msg) 121 for error_to_ignore in action.errors_to_ignore(self.exe): 122 if error_to_ignore in e.msg: 123 self.ignored_errors[error_to_ignore][type(action)] += 1 124 if ( 125 "Please disconnect and re-connect" in e.msg 126 or "server closed the connection unexpectedly" in e.msg 127 or "Can't create a connection to host" in e.msg 128 or "Connection refused" in e.msg 129 or "the connection is lost" in e.msg 130 or "connection in transaction status INERROR" in e.msg 131 ): 132 self.exe.reconnect_next = True 133 else: 134 self.exe.rollback_next = True 135 break 136 else: 137 thread_name = threading.current_thread().getName() 138 self.occurred_exception = e 139 print(f"+++ [{thread_name}] Query failed: {e.query} {e.msg}") 140 raise 141 except Exception as e: 142 self.occurred_exception = e 143 raise e 144 145 self.exe.cur.connection.close() 146 if self.exe.ws: 147 self.exe.ws.close()