Module materialize.feature_benchmark.scenarios.subscribe
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from random import getrandbits
from textwrap import dedent
from materialize.feature_benchmark.measurement_source import MeasurementSource, Td
from materialize.feature_benchmark.scenario import Scenario
class SubscribeParallel(Scenario):
"""Feature benchmarks related to SUBSCRIBE"""
SCALE = 2 # So 100 concurrent SUBSCRIBEs by default, limited by #18261
FIXED_SCALE = True
def benchmark(self) -> MeasurementSource:
return Td(
self.create_subscribe_source()
+ "\n".join(
[
dedent(
f"""
$ postgres-connect name=conn{i} url=postgres://materialize:materialize@${{testdrive.materialize-sql-addr}}
$ postgres-execute connection=conn{i}
# STRICT SERIALIZABLE is affected by #18353
START TRANSACTION ISOLATION LEVEL SERIALIZABLE;
DECLARE c{i} CURSOR FOR SUBSCRIBE s1
"""
)
for i in range(0, self.n())
]
)
+ self.insert()
# We measure from here ...
+ dedent(
"""
> SELECT COUNT(*) FROM s1;
/* A */
1
"""
)
+ "\n".join(
[
dedent(
f"""
$ postgres-execute connection=conn{i}
FETCH ALL FROM c{i};
"""
)
for i in range(0, self.n())
]
)
# ... to here
+ dedent(
"""
> SELECT 1
/* B */
1
"""
)
)
def create_subscribe_source(self) -> str:
assert False, "Scenario needs to provide a source/table definition"
def insert(self) -> str:
assert False, "Scenario needs to provide insert()"
class SubscribeParallelTable(SubscribeParallel):
def create_subscribe_source(self) -> str:
return dedent(
"""
> DROP TABLE IF EXISTS s1;
> CREATE TABLE s1 (f1 TEXT);
"""
)
def insert(self) -> str:
return "> INSERT INTO s1 VALUES (REPEAT('x', 1024))\n"
class SubscribeParallelTableWithIndex(SubscribeParallel):
def create_subscribe_source(self) -> str:
return dedent(
"""
> DROP TABLE IF EXISTS s1;
> CREATE TABLE s1 (f1 INTEGER);
> CREATE DEFAULT INDEX ON s1;
"""
)
def insert(self) -> str:
return "> INSERT INTO s1 VALUES (123)\n"
class SubscribeParallelKafka(SubscribeParallel):
def create_subscribe_source(self) -> str:
# As we are doing `kafka-ingest` in the middle of the benchmark() method
# we must always use a unique topic to ensure isolation between the individal
# measurements
self._unique_topic_id = getrandbits(64)
return dedent(
f"""
# Separate topic for each Mz instance
$ kafka-create-topic topic=subscribe-kafka-{self._unique_topic_id}
>[version<7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
> DROP CLUSTER IF EXISTS source_cluster CASCADE;
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> DROP SOURCE IF EXISTS s1 CASCADE;
> CREATE SOURCE s1
IN CLUSTER source_cluster
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-subscribe-kafka-{self._unique_topic_id}-${{testdrive.seed}}')
FORMAT BYTES ENVELOPE NONE;
> CREATE DEFAULT INDEX ON s1;
"""
)
def insert(self) -> str:
return dedent(
f"""
$ kafka-ingest format=bytes topic=subscribe-kafka-{self._unique_topic_id}
123
"""
)
Functions
def getrandbits(k, /)
-
getrandbits(k) -> x. Generates an int with k random bits.
Classes
class SubscribeParallel (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Feature benchmarks related to SUBSCRIBE
Expand source code Browse git
class SubscribeParallel(Scenario): """Feature benchmarks related to SUBSCRIBE""" SCALE = 2 # So 100 concurrent SUBSCRIBEs by default, limited by #18261 FIXED_SCALE = True def benchmark(self) -> MeasurementSource: return Td( self.create_subscribe_source() + "\n".join( [ dedent( f""" $ postgres-connect name=conn{i} url=postgres://materialize:materialize@${{testdrive.materialize-sql-addr}} $ postgres-execute connection=conn{i} # STRICT SERIALIZABLE is affected by #18353 START TRANSACTION ISOLATION LEVEL SERIALIZABLE; DECLARE c{i} CURSOR FOR SUBSCRIBE s1 """ ) for i in range(0, self.n()) ] ) + self.insert() # We measure from here ... + dedent( """ > SELECT COUNT(*) FROM s1; /* A */ 1 """ ) + "\n".join( [ dedent( f""" $ postgres-execute connection=conn{i} FETCH ALL FROM c{i}; """ ) for i in range(0, self.n()) ] ) # ... to here + dedent( """ > SELECT 1 /* B */ 1 """ ) ) def create_subscribe_source(self) -> str: assert False, "Scenario needs to provide a source/table definition" def insert(self) -> str: assert False, "Scenario needs to provide insert()"
Ancestors
Subclasses
Class variables
var FIXED_SCALE : bool
var SCALE : float
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( self.create_subscribe_source() + "\n".join( [ dedent( f""" $ postgres-connect name=conn{i} url=postgres://materialize:materialize@${{testdrive.materialize-sql-addr}} $ postgres-execute connection=conn{i} # STRICT SERIALIZABLE is affected by #18353 START TRANSACTION ISOLATION LEVEL SERIALIZABLE; DECLARE c{i} CURSOR FOR SUBSCRIBE s1 """ ) for i in range(0, self.n()) ] ) + self.insert() # We measure from here ... + dedent( """ > SELECT COUNT(*) FROM s1; /* A */ 1 """ ) + "\n".join( [ dedent( f""" $ postgres-execute connection=conn{i} FETCH ALL FROM c{i}; """ ) for i in range(0, self.n()) ] ) # ... to here + dedent( """ > SELECT 1 /* B */ 1 """ ) )
def create_subscribe_source(self) ‑> str
-
Expand source code Browse git
def create_subscribe_source(self) -> str: assert False, "Scenario needs to provide a source/table definition"
def insert(self) ‑> str
-
Expand source code Browse git
def insert(self) -> str: assert False, "Scenario needs to provide insert()"
Inherited members
class SubscribeParallelKafka (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Feature benchmarks related to SUBSCRIBE
Expand source code Browse git
class SubscribeParallelKafka(SubscribeParallel): def create_subscribe_source(self) -> str: # As we are doing `kafka-ingest` in the middle of the benchmark() method # we must always use a unique topic to ensure isolation between the individal # measurements self._unique_topic_id = getrandbits(64) return dedent( f""" # Separate topic for each Mz instance $ kafka-create-topic topic=subscribe-kafka-{self._unique_topic_id} >[version<7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > DROP CLUSTER IF EXISTS source_cluster CASCADE; > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > DROP SOURCE IF EXISTS s1 CASCADE; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-subscribe-kafka-{self._unique_topic_id}-${{testdrive.seed}}') FORMAT BYTES ENVELOPE NONE; > CREATE DEFAULT INDEX ON s1; """ ) def insert(self) -> str: return dedent( f""" $ kafka-ingest format=bytes topic=subscribe-kafka-{self._unique_topic_id} 123 """ )
Ancestors
Methods
def create_subscribe_source(self) ‑> str
-
Expand source code Browse git
def create_subscribe_source(self) -> str: # As we are doing `kafka-ingest` in the middle of the benchmark() method # we must always use a unique topic to ensure isolation between the individal # measurements self._unique_topic_id = getrandbits(64) return dedent( f""" # Separate topic for each Mz instance $ kafka-create-topic topic=subscribe-kafka-{self._unique_topic_id} >[version<7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > DROP CLUSTER IF EXISTS source_cluster CASCADE; > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > DROP SOURCE IF EXISTS s1 CASCADE; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-subscribe-kafka-{self._unique_topic_id}-${{testdrive.seed}}') FORMAT BYTES ENVELOPE NONE; > CREATE DEFAULT INDEX ON s1; """ )
def insert(self) ‑> str
-
Expand source code Browse git
def insert(self) -> str: return dedent( f""" $ kafka-ingest format=bytes topic=subscribe-kafka-{self._unique_topic_id} 123 """ )
Inherited members
class SubscribeParallelTable (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Feature benchmarks related to SUBSCRIBE
Expand source code Browse git
class SubscribeParallelTable(SubscribeParallel): def create_subscribe_source(self) -> str: return dedent( """ > DROP TABLE IF EXISTS s1; > CREATE TABLE s1 (f1 TEXT); """ ) def insert(self) -> str: return "> INSERT INTO s1 VALUES (REPEAT('x', 1024))\n"
Ancestors
Methods
def create_subscribe_source(self) ‑> str
-
Expand source code Browse git
def create_subscribe_source(self) -> str: return dedent( """ > DROP TABLE IF EXISTS s1; > CREATE TABLE s1 (f1 TEXT); """ )
def insert(self) ‑> str
-
Expand source code Browse git
def insert(self) -> str: return "> INSERT INTO s1 VALUES (REPEAT('x', 1024))\n"
Inherited members
class SubscribeParallelTableWithIndex (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Feature benchmarks related to SUBSCRIBE
Expand source code Browse git
class SubscribeParallelTableWithIndex(SubscribeParallel): def create_subscribe_source(self) -> str: return dedent( """ > DROP TABLE IF EXISTS s1; > CREATE TABLE s1 (f1 INTEGER); > CREATE DEFAULT INDEX ON s1; """ ) def insert(self) -> str: return "> INSERT INTO s1 VALUES (123)\n"
Ancestors
Methods
def create_subscribe_source(self) ‑> str
-
Expand source code Browse git
def create_subscribe_source(self) -> str: return dedent( """ > DROP TABLE IF EXISTS s1; > CREATE TABLE s1 (f1 INTEGER); > CREATE DEFAULT INDEX ON s1; """ )
def insert(self) ‑> str
-
Expand source code Browse git
def insert(self) -> str: return "> INSERT INTO s1 VALUES (123)\n"
Inherited members