misc.python.materialize.parallel_workload.worker

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10import random
 11import threading
 12import time
 13from collections import Counter, defaultdict
 14
 15import psycopg
 16import websocket
 17
 18from materialize.data_ingest.query_error import QueryError
 19from materialize.mzcompose.composition import Composition
 20from materialize.parallel_workload.action import (
 21    Action,
 22    ActionList,
 23    ReconnectAction,
 24    ws_connect,
 25)
 26from materialize.parallel_workload.database import Database
 27from materialize.parallel_workload.executor import Executor
 28
 29
 30class Worker:
 31    rng: random.Random
 32    action_list: ActionList | None
 33    actions: list[Action]
 34    weights: list[float]
 35    end_time: float
 36    num_queries: Counter[type[Action]]
 37    autocommit: bool
 38    system: bool
 39    exe: Executor | None
 40    ignored_errors: defaultdict[str, Counter[type[Action]]]
 41    composition: Composition | None
 42    occurred_exception: Exception | None
 43
 44    def __init__(
 45        self,
 46        rng: random.Random,
 47        actions: list[Action],
 48        weights: list[float],
 49        end_time: float,
 50        autocommit: bool,
 51        system: bool,
 52        composition: Composition | None,
 53        action_list: ActionList | None = None,
 54    ):
 55        self.rng = rng
 56        self.action_list = action_list
 57        self.actions = actions
 58        self.weights = weights
 59        self.end_time = end_time
 60        self.num_queries = Counter()
 61        self.autocommit = autocommit
 62        self.system = system
 63        self.ignored_errors = defaultdict(Counter)
 64        self.composition = composition
 65        self.occurred_exception = None
 66        self.exe = None
 67
 68    def run(
 69        self, host: str, pg_port: int, http_port: int, user: str, database: Database
 70    ) -> None:
 71        self.conn = psycopg.connect(
 72            host=host, port=pg_port, user=user, dbname="materialize"
 73        )
 74        self.conn.autocommit = self.autocommit
 75        cur = self.conn.cursor()
 76        ws = websocket.WebSocket()
 77        ws_conn_id, ws_secret_key = ws_connect(ws, host, http_port, user)
 78        self.exe = Executor(self.rng, cur, ws, database)
 79        self.exe.set_isolation("SERIALIZABLE")
 80        cur.execute("SET auto_route_catalog_queries TO false")
 81        if self.exe.use_ws:
 82            self.exe.pg_pid = ws_conn_id
 83        else:
 84            cur.execute("SELECT pg_backend_pid()")
 85            self.exe.pg_pid = cur.fetchall()[0][0]
 86
 87        while time.time() < self.end_time:
 88            action = self.rng.choices(self.actions, self.weights)[0]
 89            try:
 90                if self.exe.rollback_next:
 91                    try:
 92                        self.exe.rollback()
 93                    except QueryError as e:
 94                        if (
 95                            "Please disconnect and re-connect" in e.msg
 96                            or "server closed the connection unexpectedly" in e.msg
 97                            or "Can't create a connection to host" in e.msg
 98                            or "Connection refused" in e.msg
 99                            or "the connection is lost" in e.msg
100                            or "connection in transaction status INERROR" in e.msg
101                        ):
102                            self.exe.reconnect_next = True
103                            self.exe.rollback_next = False
104                            continue
105                    self.exe.rollback_next = False
106                if self.exe.reconnect_next:
107                    ReconnectAction(self.rng, self.composition, random_role=False).run(
108                        self.exe
109                    )
110                    self.exe.reconnect_next = False
111                if action.run(self.exe):
112                    self.num_queries[type(action)] += 1
113            except QueryError as e:
114                self.num_queries[type(action)] += 1
115                # TODO(def-): Reduce number of errors for temp tables/views? At
116                # least the errors will be fast, so maybe not worth it
117                # if "temp" in e.msg:
118                #     print(e.query)
119                #     print(e.msg)
120                for error_to_ignore in action.errors_to_ignore(self.exe):
121                    if error_to_ignore in e.msg:
122                        self.ignored_errors[error_to_ignore][type(action)] += 1
123                        if (
124                            "Please disconnect and re-connect" in e.msg
125                            or "server closed the connection unexpectedly" in e.msg
126                            or "Can't create a connection to host" in e.msg
127                            or "Connection refused" in e.msg
128                            or "the connection is lost" in e.msg
129                            or "connection in transaction status INERROR" in e.msg
130                        ):
131                            self.exe.reconnect_next = True
132                        else:
133                            self.exe.rollback_next = True
134                        break
135                else:
136                    thread_name = threading.current_thread().getName()
137                    self.occurred_exception = e
138                    print(f"+++ [{thread_name}] Query failed: {e.query} {e.msg}")
139                    raise
140            except Exception as e:
141                self.occurred_exception = e
142                raise e
143
144        self.exe.cur.connection.close()
145        if self.exe.ws:
146            self.exe.ws.close()
class Worker:
 31class Worker:
 32    rng: random.Random
 33    action_list: ActionList | None
 34    actions: list[Action]
 35    weights: list[float]
 36    end_time: float
 37    num_queries: Counter[type[Action]]
 38    autocommit: bool
 39    system: bool
 40    exe: Executor | None
 41    ignored_errors: defaultdict[str, Counter[type[Action]]]
 42    composition: Composition | None
 43    occurred_exception: Exception | None
 44
 45    def __init__(
 46        self,
 47        rng: random.Random,
 48        actions: list[Action],
 49        weights: list[float],
 50        end_time: float,
 51        autocommit: bool,
 52        system: bool,
 53        composition: Composition | None,
 54        action_list: ActionList | None = None,
 55    ):
 56        self.rng = rng
 57        self.action_list = action_list
 58        self.actions = actions
 59        self.weights = weights
 60        self.end_time = end_time
 61        self.num_queries = Counter()
 62        self.autocommit = autocommit
 63        self.system = system
 64        self.ignored_errors = defaultdict(Counter)
 65        self.composition = composition
 66        self.occurred_exception = None
 67        self.exe = None
 68
 69    def run(
 70        self, host: str, pg_port: int, http_port: int, user: str, database: Database
 71    ) -> None:
 72        self.conn = psycopg.connect(
 73            host=host, port=pg_port, user=user, dbname="materialize"
 74        )
 75        self.conn.autocommit = self.autocommit
 76        cur = self.conn.cursor()
 77        ws = websocket.WebSocket()
 78        ws_conn_id, ws_secret_key = ws_connect(ws, host, http_port, user)
 79        self.exe = Executor(self.rng, cur, ws, database)
 80        self.exe.set_isolation("SERIALIZABLE")
 81        cur.execute("SET auto_route_catalog_queries TO false")
 82        if self.exe.use_ws:
 83            self.exe.pg_pid = ws_conn_id
 84        else:
 85            cur.execute("SELECT pg_backend_pid()")
 86            self.exe.pg_pid = cur.fetchall()[0][0]
 87
 88        while time.time() < self.end_time:
 89            action = self.rng.choices(self.actions, self.weights)[0]
 90            try:
 91                if self.exe.rollback_next:
 92                    try:
 93                        self.exe.rollback()
 94                    except QueryError as e:
 95                        if (
 96                            "Please disconnect and re-connect" in e.msg
 97                            or "server closed the connection unexpectedly" in e.msg
 98                            or "Can't create a connection to host" in e.msg
 99                            or "Connection refused" in e.msg
100                            or "the connection is lost" in e.msg
101                            or "connection in transaction status INERROR" in e.msg
102                        ):
103                            self.exe.reconnect_next = True
104                            self.exe.rollback_next = False
105                            continue
106                    self.exe.rollback_next = False
107                if self.exe.reconnect_next:
108                    ReconnectAction(self.rng, self.composition, random_role=False).run(
109                        self.exe
110                    )
111                    self.exe.reconnect_next = False
112                if action.run(self.exe):
113                    self.num_queries[type(action)] += 1
114            except QueryError as e:
115                self.num_queries[type(action)] += 1
116                # TODO(def-): Reduce number of errors for temp tables/views? At
117                # least the errors will be fast, so maybe not worth it
118                # if "temp" in e.msg:
119                #     print(e.query)
120                #     print(e.msg)
121                for error_to_ignore in action.errors_to_ignore(self.exe):
122                    if error_to_ignore in e.msg:
123                        self.ignored_errors[error_to_ignore][type(action)] += 1
124                        if (
125                            "Please disconnect and re-connect" in e.msg
126                            or "server closed the connection unexpectedly" in e.msg
127                            or "Can't create a connection to host" in e.msg
128                            or "Connection refused" in e.msg
129                            or "the connection is lost" in e.msg
130                            or "connection in transaction status INERROR" in e.msg
131                        ):
132                            self.exe.reconnect_next = True
133                        else:
134                            self.exe.rollback_next = True
135                        break
136                else:
137                    thread_name = threading.current_thread().getName()
138                    self.occurred_exception = e
139                    print(f"+++ [{thread_name}] Query failed: {e.query} {e.msg}")
140                    raise
141            except Exception as e:
142                self.occurred_exception = e
143                raise e
144
145        self.exe.cur.connection.close()
146        if self.exe.ws:
147            self.exe.ws.close()
Worker( rng: random.Random, actions: list[materialize.parallel_workload.action.Action], weights: list[float], end_time: float, autocommit: bool, system: bool, composition: materialize.mzcompose.composition.Composition | None, action_list: materialize.parallel_workload.action.ActionList | None = None)
45    def __init__(
46        self,
47        rng: random.Random,
48        actions: list[Action],
49        weights: list[float],
50        end_time: float,
51        autocommit: bool,
52        system: bool,
53        composition: Composition | None,
54        action_list: ActionList | None = None,
55    ):
56        self.rng = rng
57        self.action_list = action_list
58        self.actions = actions
59        self.weights = weights
60        self.end_time = end_time
61        self.num_queries = Counter()
62        self.autocommit = autocommit
63        self.system = system
64        self.ignored_errors = defaultdict(Counter)
65        self.composition = composition
66        self.occurred_exception = None
67        self.exe = None
rng: random.Random
action_list: materialize.parallel_workload.action.ActionList | None
actions: list[materialize.parallel_workload.action.Action]
weights: list[float]
end_time: float
num_queries: collections.Counter[type[materialize.parallel_workload.action.Action]]
autocommit: bool
system: bool
exe: materialize.parallel_workload.executor.Executor | None
ignored_errors: collections.defaultdict[str, collections.Counter[type[materialize.parallel_workload.action.Action]]]
composition: materialize.mzcompose.composition.Composition | None
occurred_exception: Exception | None
def run( self, host: str, pg_port: int, http_port: int, user: str, database: materialize.parallel_workload.database.Database) -> None:
 69    def run(
 70        self, host: str, pg_port: int, http_port: int, user: str, database: Database
 71    ) -> None:
 72        self.conn = psycopg.connect(
 73            host=host, port=pg_port, user=user, dbname="materialize"
 74        )
 75        self.conn.autocommit = self.autocommit
 76        cur = self.conn.cursor()
 77        ws = websocket.WebSocket()
 78        ws_conn_id, ws_secret_key = ws_connect(ws, host, http_port, user)
 79        self.exe = Executor(self.rng, cur, ws, database)
 80        self.exe.set_isolation("SERIALIZABLE")
 81        cur.execute("SET auto_route_catalog_queries TO false")
 82        if self.exe.use_ws:
 83            self.exe.pg_pid = ws_conn_id
 84        else:
 85            cur.execute("SELECT pg_backend_pid()")
 86            self.exe.pg_pid = cur.fetchall()[0][0]
 87
 88        while time.time() < self.end_time:
 89            action = self.rng.choices(self.actions, self.weights)[0]
 90            try:
 91                if self.exe.rollback_next:
 92                    try:
 93                        self.exe.rollback()
 94                    except QueryError as e:
 95                        if (
 96                            "Please disconnect and re-connect" in e.msg
 97                            or "server closed the connection unexpectedly" in e.msg
 98                            or "Can't create a connection to host" in e.msg
 99                            or "Connection refused" in e.msg
100                            or "the connection is lost" in e.msg
101                            or "connection in transaction status INERROR" in e.msg
102                        ):
103                            self.exe.reconnect_next = True
104                            self.exe.rollback_next = False
105                            continue
106                    self.exe.rollback_next = False
107                if self.exe.reconnect_next:
108                    ReconnectAction(self.rng, self.composition, random_role=False).run(
109                        self.exe
110                    )
111                    self.exe.reconnect_next = False
112                if action.run(self.exe):
113                    self.num_queries[type(action)] += 1
114            except QueryError as e:
115                self.num_queries[type(action)] += 1
116                # TODO(def-): Reduce number of errors for temp tables/views? At
117                # least the errors will be fast, so maybe not worth it
118                # if "temp" in e.msg:
119                #     print(e.query)
120                #     print(e.msg)
121                for error_to_ignore in action.errors_to_ignore(self.exe):
122                    if error_to_ignore in e.msg:
123                        self.ignored_errors[error_to_ignore][type(action)] += 1
124                        if (
125                            "Please disconnect and re-connect" in e.msg
126                            or "server closed the connection unexpectedly" in e.msg
127                            or "Can't create a connection to host" in e.msg
128                            or "Connection refused" in e.msg
129                            or "the connection is lost" in e.msg
130                            or "connection in transaction status INERROR" in e.msg
131                        ):
132                            self.exe.reconnect_next = True
133                        else:
134                            self.exe.rollback_next = True
135                        break
136                else:
137                    thread_name = threading.current_thread().getName()
138                    self.occurred_exception = e
139                    print(f"+++ [{thread_name}] Query failed: {e.query} {e.msg}")
140                    raise
141            except Exception as e:
142                self.occurred_exception = e
143                raise e
144
145        self.exe.cur.connection.close()
146        if self.exe.ws:
147            self.exe.ws.close()