Module materialize.benches.avro_ingest
Ingest some Avro records, and report how long it takes
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
"""Ingest some Avro records, and report how long it takes"""
import argparse
import json
import os
import time
from typing import IO, NamedTuple, cast
import docker
import pg8000
import psutil
import requests
from docker.models.containers import Container
from pg8000.dbapi import ProgrammingError
from materialize import MZ_ROOT, mzbuild, ui
def wait_for_confluent(host: str) -> None:
url = f"http://{host}:8081/subjects"
while True:
try:
print(f"Checking if schema registry at {url} is accessible...")
r = requests.get(url)
if r.status_code == 200:
print("Schema registry is ready")
return
except requests.exceptions.ConnectionError as e:
print(e)
time.sleep(5)
def mz_proc(container: Container) -> psutil.Process:
container.reload()
pid = container.attrs["State"]["Pid"] # type: ignore
docker_init = psutil.Process(pid)
for child in docker_init.children(recursive=True):
if child.name() == "materialized":
return child
raise RuntimeError("Couldn't find materialized pid")
class PrevStats(NamedTuple):
wall_time: float
user_cpu: float
system_cpu: float
def print_stats(container: Container, prev: PrevStats, file: IO) -> PrevStats:
proc = mz_proc(container)
memory = proc.memory_info()
cpu = proc.cpu_times()
new_prev = PrevStats(time.time(), cpu.user, cpu.system)
print(
f"{memory.rss},{memory.vms},{new_prev.user_cpu - prev.user_cpu},{new_prev.system_cpu - prev.system_cpu},{new_prev.wall_time - prev.wall_time}",
file=file,
flush=True,
)
return new_prev
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--confluent-host",
default="confluent",
help="The hostname of a machine running the Confluent Platform",
)
parser.add_argument(
"-n",
"--trials",
default=1,
type=int,
help="Number of measurements to take",
)
parser.add_argument(
"-r",
"--records",
default=1000000,
type=int,
help="Number of Avro records to generate",
)
args = parser.parse_args()
os.chdir(MZ_ROOT)
coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED")
repo = mzbuild.Repository(MZ_ROOT, coverage=coverage)
wait_for_confluent(args.confluent_host)
images = ["kgen", "materialized"]
deps = repo.resolve_dependencies([repo.images[name] for name in images])
deps.acquire()
docker_client = docker.from_env()
# NOTE: We have to override the type below because if `detach=True` it
# returns a Container, and the typechecker doesn't know that.
mz_container: Container = cast(
Container,
docker_client.containers.run(
deps["materialized"].spec(),
detach=True,
network_mode="host",
),
)
docker_client.containers.run(
deps["kgen"].spec(),
[
f"--num-records={args.records}",
f"--bootstrap-server={args.confluent_host}:9092",
f"--schema-registry-url=http://{args.confluent_host}:8081",
"--topic=bench_data",
"--keys=avro",
"--values=avro",
f"--avro-schema={VALUE_SCHEMA}",
f"--avro-distribution={VALUE_DISTRIBUTION}",
f"--avro-key-schema={KEY_SCHEMA}",
f"--avro-key-distribution={KEY_DISTRIBUTION}",
],
network_mode="host",
)
conn = pg8000.connect(host="localhost", port=6875, user="materialize")
conn.autocommit = True
with conn.cursor() as cur:
cur.execute(
f"""CREATE CONNECTION IF NOT EXISTS csr_conn
TO CONFLUENT SCHEMA REGISTRY (
URL 'http://{args.confluent_host}:8081'
)"""
)
cur.execute(
f"""CREATE CONNECTION kafka_conn
TO KAFKA (BROKER '{args.confluent_host}:9092', SECURITY PROTOCOL PLAINTEXT)"""
)
cur.execute(
"""CREATE SOURCE src
FROM KAFKA CONNECTION kafka_conn (TOPIC 'bench_data')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn"""
)
results_file = open("results.csv", "w")
print("Rss,Vms,User Cpu,System Cpu,Wall Time", file=results_file, flush=True)
prev = PrevStats(time.time(), 0.0, 0.0)
for _ in range(args.trials):
cur.execute("DROP VIEW IF EXISTS cnt")
cur.execute("CREATE MATERIALIZED VIEW cnt AS SELECT count(*) FROM src")
while True:
try:
cur.execute("SELECT * FROM cnt")
row = cur.fetchone()
assert row is not None
n = row[0]
if n >= args.records:
break
except ProgrammingError:
pass
time.sleep(1)
prev = print_stats(mz_container, prev, results_file)
KEY_SCHEMA = json.dumps(
{
"name": "testrecordkey",
"type": "record",
"namespace": "com.acme.avro",
"fields": [{"name": "Key1", "type": "long"}, {"name": "Key2", "type": "long"}],
}
)
KEY_DISTRIBUTION = json.dumps(
{
"com.acme.avro.testrecordkey::Key1": [0, 100],
"com.acme.avro.testrecordkey::Key2": [0, 250000],
}
)
VALUE_SCHEMA = json.dumps(
{
"name": "testrecord",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{"name": "Key1Unused", "type": "long"},
{"name": "Key2Unused", "type": "long"},
{
"name": "OuterRecord",
"type": {
"name": "OuterRecord",
"type": "record",
"fields": [
{
"name": "Record1",
"type": {
"name": "Record1",
"type": "record",
"fields": [
{
"name": "InnerRecord1",
"type": {
"name": "InnerRecord1",
"type": "record",
"fields": [
{"name": "Point", "type": "long"}
],
},
},
{
"name": "InnerRecord2",
"type": {
"name": "InnerRecord2",
"type": "record",
"fields": [
{"name": "Point", "type": "long"}
],
},
},
],
},
},
{
"name": "Record2",
"type": {
"name": "Record2",
"type": "record",
"fields": [
{
"name": "InnerRecord3",
"type": {
"name": "InnerRecord3",
"type": "record",
"fields": [
{"name": "Point", "type": "long"}
],
},
},
{
"name": "InnerRecord4",
"type": {
"name": "InnerRecord4",
"type": "record",
"fields": [
{"name": "Point", "type": "long"}
],
},
},
],
},
},
],
},
},
],
}
)
VALUE_DISTRIBUTION = json.dumps(
{
"com.acme.avro.testrecord::Key1Unused": [0, 100],
"com.acme.avro.testrecord::Key2Unused": [0, 250000],
"com.acme.avro.InnerRecord1::Point": [10000, 1000000000],
"com.acme.avro.InnerRecord2::Point": [10000, 1000000000],
"com.acme.avro.InnerRecord3::Point": [10000, 1000000000],
"com.acme.avro.InnerRecord4::Point": [10000, 10000000000],
}
)
if __name__ == "__main__":
main()
Functions
def main() ‑> None
-
Expand source code Browse git
def main() -> None: parser = argparse.ArgumentParser() parser.add_argument( "--confluent-host", default="confluent", help="The hostname of a machine running the Confluent Platform", ) parser.add_argument( "-n", "--trials", default=1, type=int, help="Number of measurements to take", ) parser.add_argument( "-r", "--records", default=1000000, type=int, help="Number of Avro records to generate", ) args = parser.parse_args() os.chdir(MZ_ROOT) coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED") repo = mzbuild.Repository(MZ_ROOT, coverage=coverage) wait_for_confluent(args.confluent_host) images = ["kgen", "materialized"] deps = repo.resolve_dependencies([repo.images[name] for name in images]) deps.acquire() docker_client = docker.from_env() # NOTE: We have to override the type below because if `detach=True` it # returns a Container, and the typechecker doesn't know that. mz_container: Container = cast( Container, docker_client.containers.run( deps["materialized"].spec(), detach=True, network_mode="host", ), ) docker_client.containers.run( deps["kgen"].spec(), [ f"--num-records={args.records}", f"--bootstrap-server={args.confluent_host}:9092", f"--schema-registry-url=http://{args.confluent_host}:8081", "--topic=bench_data", "--keys=avro", "--values=avro", f"--avro-schema={VALUE_SCHEMA}", f"--avro-distribution={VALUE_DISTRIBUTION}", f"--avro-key-schema={KEY_SCHEMA}", f"--avro-key-distribution={KEY_DISTRIBUTION}", ], network_mode="host", ) conn = pg8000.connect(host="localhost", port=6875, user="materialize") conn.autocommit = True with conn.cursor() as cur: cur.execute( f"""CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY ( URL 'http://{args.confluent_host}:8081' )""" ) cur.execute( f"""CREATE CONNECTION kafka_conn TO KAFKA (BROKER '{args.confluent_host}:9092', SECURITY PROTOCOL PLAINTEXT)""" ) cur.execute( """CREATE SOURCE src FROM KAFKA CONNECTION kafka_conn (TOPIC 'bench_data') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn""" ) results_file = open("results.csv", "w") print("Rss,Vms,User Cpu,System Cpu,Wall Time", file=results_file, flush=True) prev = PrevStats(time.time(), 0.0, 0.0) for _ in range(args.trials): cur.execute("DROP VIEW IF EXISTS cnt") cur.execute("CREATE MATERIALIZED VIEW cnt AS SELECT count(*) FROM src") while True: try: cur.execute("SELECT * FROM cnt") row = cur.fetchone() assert row is not None n = row[0] if n >= args.records: break except ProgrammingError: pass time.sleep(1) prev = print_stats(mz_container, prev, results_file)
def mz_proc(container: docker.models.containers.Container) ‑> psutil.Process
-
Expand source code Browse git
def mz_proc(container: Container) -> psutil.Process: container.reload() pid = container.attrs["State"]["Pid"] # type: ignore docker_init = psutil.Process(pid) for child in docker_init.children(recursive=True): if child.name() == "materialized": return child raise RuntimeError("Couldn't find materialized pid")
def print_stats(container: docker.models.containers.Container, prev: PrevStats, file:
) ‑> PrevStats -
Expand source code Browse git
def print_stats(container: Container, prev: PrevStats, file: IO) -> PrevStats: proc = mz_proc(container) memory = proc.memory_info() cpu = proc.cpu_times() new_prev = PrevStats(time.time(), cpu.user, cpu.system) print( f"{memory.rss},{memory.vms},{new_prev.user_cpu - prev.user_cpu},{new_prev.system_cpu - prev.system_cpu},{new_prev.wall_time - prev.wall_time}", file=file, flush=True, ) return new_prev
def wait_for_confluent(host: str) ‑> None
-
Expand source code Browse git
def wait_for_confluent(host: str) -> None: url = f"http://{host}:8081/subjects" while True: try: print(f"Checking if schema registry at {url} is accessible...") r = requests.get(url) if r.status_code == 200: print("Schema registry is ready") return except requests.exceptions.ConnectionError as e: print(e) time.sleep(5)
Classes
class PrevStats (wall_time: float, user_cpu: float, system_cpu: float)
-
PrevStats(wall_time, user_cpu, system_cpu)
Expand source code Browse git
class PrevStats(NamedTuple): wall_time: float user_cpu: float system_cpu: float
Ancestors
- builtins.tuple
Instance variables
var system_cpu : float
-
Alias for field number 2
var user_cpu : float
-
Alias for field number 1
var wall_time : float
-
Alias for field number 0