Module materialize.scalability.endpoints
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import pg8000
from materialize import git
from materialize.mzcompose.composition import Composition
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.postgres import Postgres
from materialize.scalability.endpoint import Endpoint
POSTGRES_ENDPOINT_NAME = "postgres"
TARGET_MATERIALIZE_LOCAL = "local"
TARGET_MATERIALIZE_REMOTE = "remote"
TARGET_POSTGRES = "postgres"
TARGET_HEAD = "HEAD"
class MaterializeRemote(Endpoint):
"""Connect to a remote Materialize instance using a psql URL"""
def __init__(self, materialize_url: str) -> None:
super().__init__(specified_target=TARGET_MATERIALIZE_REMOTE)
self.materialize_url = materialize_url
def url(self) -> str:
return self.materialize_url
def up(self) -> None:
pass
def __str__(self) -> str:
return f"MaterializeRemote ({self.materialize_url})"
class PostgresContainer(Endpoint):
def __init__(self, composition: Composition) -> None:
self.composition = composition
self._port: int | None = None
super().__init__(specified_target=TARGET_POSTGRES)
def host(self) -> str:
return "localhost"
def port(self) -> int:
assert self._port is not None
return self._port
def user(self) -> str:
return "postgres"
def password(self) -> str:
return "postgres"
def up(self) -> None:
self.composition.down(destroy_volumes=True)
with self.composition.override(Postgres()):
self.composition.up("postgres")
self._port = self.composition.default_port("postgres")
def try_load_version(self) -> str:
return POSTGRES_ENDPOINT_NAME
def __str__(self) -> str:
return "PostgresContainer"
class MaterializeNonRemote(Endpoint):
def __init__(self, specified_target: str):
super().__init__(specified_target)
def host(self) -> str:
return "localhost"
def internal_host(self) -> str:
return "localhost"
def user(self) -> str:
return "materialize"
def password(self) -> str:
return "materialize"
def internal_port(self) -> int:
raise NotImplementedError
def lift_limits(self) -> None:
priv_conn = pg8000.connect(
host=self.internal_host(), user="mz_system", port=self.internal_port()
)
priv_conn.autocommit = True
priv_cursor = priv_conn.cursor()
priv_cursor.execute("ALTER SYSTEM SET max_connections = 65535;")
priv_cursor.execute("ALTER SYSTEM SET max_tables = 65535;")
priv_cursor.execute("ALTER SYSTEM SET max_materialized_views = 65535;")
class MaterializeLocal(MaterializeNonRemote):
"""Connect to a Materialize instance running on the local host"""
def __init__(self) -> None:
super().__init__(specified_target=TARGET_MATERIALIZE_LOCAL)
def port(self) -> int:
return 6875
def internal_port(self) -> int:
return 6877
def up(self) -> None:
self.lift_limits()
def __str__(self) -> str:
return f"MaterializeLocal ({self.host()})"
class MaterializeContainer(MaterializeNonRemote):
def __init__(
self,
composition: Composition,
specified_target: str,
resolved_target: str,
use_balancerd: bool,
image: str | None = None,
alternative_image: str | None = None,
) -> None:
self.composition = composition
self.image = image
self.alternative_image = (
alternative_image if image != alternative_image else None
)
self._port: int | None = None
self._resolved_target = resolved_target
self.use_balancerd = use_balancerd
super().__init__(specified_target)
def resolved_target(self) -> str:
return self._resolved_target
def port(self) -> int:
assert self._port is not None
return self._port
def internal_port(self) -> int:
return self.composition.port("materialized", 6877)
def up(self) -> None:
self.composition.down(destroy_volumes=True)
print(f"Image is {self.image} (alternative: {self.alternative_image})")
if self.image is not None and self.alternative_image is not None:
if not self.composition.try_pull_service_image(
Materialized(image=self.image)
):
# explicitly specified image cannot be found and alternative exists
print(
f"Unable to find image {self.image}, proceeding with alternative image {self.alternative_image}!"
)
self.image = self.alternative_image
else:
print(f"Found image {self.image}, proceeding with this image.")
self.up_internal()
self.lift_limits()
def up_internal(self) -> None:
with self.composition.override(
Materialized(image=self.image, sanity_restart=False)
):
self.composition.up("materialized")
if self.use_balancerd:
self.composition.up("balancerd")
self._port = self.composition.default_port("balancerd")
else:
self._port = self.composition.default_port("materialized")
def __str__(self) -> str:
return f"MaterializeContainer ({self.image} specified as {self.specified_target()})"
def endpoint_name_to_description(endpoint_name: str) -> str:
if endpoint_name == POSTGRES_ENDPOINT_NAME:
return endpoint_name
commit_sha = endpoint_name.split(" ")[1].strip("()")
# empty when mz_version() reports a Git SHA that is not available in the current repository
commit_message = git.get_commit_message(commit_sha) or "unknown"
return f"{endpoint_name} - {commit_message}"
Functions
def endpoint_name_to_description(endpoint_name: str) ‑> str
-
Expand source code Browse git
def endpoint_name_to_description(endpoint_name: str) -> str: if endpoint_name == POSTGRES_ENDPOINT_NAME: return endpoint_name commit_sha = endpoint_name.split(" ")[1].strip("()") # empty when mz_version() reports a Git SHA that is not available in the current repository commit_message = git.get_commit_message(commit_sha) or "unknown" return f"{endpoint_name} - {commit_message}"
Classes
class MaterializeContainer (composition: Composition, specified_target: str, resolved_target: str, use_balancerd: bool, image: str | None = None, alternative_image: str | None = None)
-
Expand source code Browse git
class MaterializeContainer(MaterializeNonRemote): def __init__( self, composition: Composition, specified_target: str, resolved_target: str, use_balancerd: bool, image: str | None = None, alternative_image: str | None = None, ) -> None: self.composition = composition self.image = image self.alternative_image = ( alternative_image if image != alternative_image else None ) self._port: int | None = None self._resolved_target = resolved_target self.use_balancerd = use_balancerd super().__init__(specified_target) def resolved_target(self) -> str: return self._resolved_target def port(self) -> int: assert self._port is not None return self._port def internal_port(self) -> int: return self.composition.port("materialized", 6877) def up(self) -> None: self.composition.down(destroy_volumes=True) print(f"Image is {self.image} (alternative: {self.alternative_image})") if self.image is not None and self.alternative_image is not None: if not self.composition.try_pull_service_image( Materialized(image=self.image) ): # explicitly specified image cannot be found and alternative exists print( f"Unable to find image {self.image}, proceeding with alternative image {self.alternative_image}!" ) self.image = self.alternative_image else: print(f"Found image {self.image}, proceeding with this image.") self.up_internal() self.lift_limits() def up_internal(self) -> None: with self.composition.override( Materialized(image=self.image, sanity_restart=False) ): self.composition.up("materialized") if self.use_balancerd: self.composition.up("balancerd") self._port = self.composition.default_port("balancerd") else: self._port = self.composition.default_port("materialized") def __str__(self) -> str: return f"MaterializeContainer ({self.image} specified as {self.specified_target()})"
Ancestors
Methods
def internal_port(self) ‑> int
-
Expand source code Browse git
def internal_port(self) -> int: return self.composition.port("materialized", 6877)
def port(self) ‑> int
-
Expand source code Browse git
def port(self) -> int: assert self._port is not None return self._port
def resolved_target(self) ‑> str
-
Expand source code Browse git
def resolved_target(self) -> str: return self._resolved_target
def up(self) ‑> None
-
Expand source code Browse git
def up(self) -> None: self.composition.down(destroy_volumes=True) print(f"Image is {self.image} (alternative: {self.alternative_image})") if self.image is not None and self.alternative_image is not None: if not self.composition.try_pull_service_image( Materialized(image=self.image) ): # explicitly specified image cannot be found and alternative exists print( f"Unable to find image {self.image}, proceeding with alternative image {self.alternative_image}!" ) self.image = self.alternative_image else: print(f"Found image {self.image}, proceeding with this image.") self.up_internal() self.lift_limits()
def up_internal(self) ‑> None
-
Expand source code Browse git
def up_internal(self) -> None: with self.composition.override( Materialized(image=self.image, sanity_restart=False) ): self.composition.up("materialized") if self.use_balancerd: self.composition.up("balancerd") self._port = self.composition.default_port("balancerd") else: self._port = self.composition.default_port("materialized")
Inherited members
class MaterializeLocal
-
Connect to a Materialize instance running on the local host
Expand source code Browse git
class MaterializeLocal(MaterializeNonRemote): """Connect to a Materialize instance running on the local host""" def __init__(self) -> None: super().__init__(specified_target=TARGET_MATERIALIZE_LOCAL) def port(self) -> int: return 6875 def internal_port(self) -> int: return 6877 def up(self) -> None: self.lift_limits() def __str__(self) -> str: return f"MaterializeLocal ({self.host()})"
Ancestors
Methods
def internal_port(self) ‑> int
-
Expand source code Browse git
def internal_port(self) -> int: return 6877
def port(self) ‑> int
-
Expand source code Browse git
def port(self) -> int: return 6875
def up(self) ‑> None
-
Expand source code Browse git
def up(self) -> None: self.lift_limits()
Inherited members
class MaterializeNonRemote (specified_target: str)
-
Expand source code Browse git
class MaterializeNonRemote(Endpoint): def __init__(self, specified_target: str): super().__init__(specified_target) def host(self) -> str: return "localhost" def internal_host(self) -> str: return "localhost" def user(self) -> str: return "materialize" def password(self) -> str: return "materialize" def internal_port(self) -> int: raise NotImplementedError def lift_limits(self) -> None: priv_conn = pg8000.connect( host=self.internal_host(), user="mz_system", port=self.internal_port() ) priv_conn.autocommit = True priv_cursor = priv_conn.cursor() priv_cursor.execute("ALTER SYSTEM SET max_connections = 65535;") priv_cursor.execute("ALTER SYSTEM SET max_tables = 65535;") priv_cursor.execute("ALTER SYSTEM SET max_materialized_views = 65535;")
Ancestors
Subclasses
Methods
def host(self) ‑> str
-
Expand source code Browse git
def host(self) -> str: return "localhost"
def internal_host(self) ‑> str
-
Expand source code Browse git
def internal_host(self) -> str: return "localhost"
def internal_port(self) ‑> int
-
Expand source code Browse git
def internal_port(self) -> int: raise NotImplementedError
def lift_limits(self) ‑> None
-
Expand source code Browse git
def lift_limits(self) -> None: priv_conn = pg8000.connect( host=self.internal_host(), user="mz_system", port=self.internal_port() ) priv_conn.autocommit = True priv_cursor = priv_conn.cursor() priv_cursor.execute("ALTER SYSTEM SET max_connections = 65535;") priv_cursor.execute("ALTER SYSTEM SET max_tables = 65535;") priv_cursor.execute("ALTER SYSTEM SET max_materialized_views = 65535;")
def password(self) ‑> str
-
Expand source code Browse git
def password(self) -> str: return "materialize"
def user(self) ‑> str
-
Expand source code Browse git
def user(self) -> str: return "materialize"
Inherited members
class MaterializeRemote (materialize_url: str)
-
Connect to a remote Materialize instance using a psql URL
Expand source code Browse git
class MaterializeRemote(Endpoint): """Connect to a remote Materialize instance using a psql URL""" def __init__(self, materialize_url: str) -> None: super().__init__(specified_target=TARGET_MATERIALIZE_REMOTE) self.materialize_url = materialize_url def url(self) -> str: return self.materialize_url def up(self) -> None: pass def __str__(self) -> str: return f"MaterializeRemote ({self.materialize_url})"
Ancestors
Methods
def up(self) ‑> None
-
Expand source code Browse git
def up(self) -> None: pass
def url(self) ‑> str
-
Expand source code Browse git
def url(self) -> str: return self.materialize_url
Inherited members
class PostgresContainer (composition: Composition)
-
Expand source code Browse git
class PostgresContainer(Endpoint): def __init__(self, composition: Composition) -> None: self.composition = composition self._port: int | None = None super().__init__(specified_target=TARGET_POSTGRES) def host(self) -> str: return "localhost" def port(self) -> int: assert self._port is not None return self._port def user(self) -> str: return "postgres" def password(self) -> str: return "postgres" def up(self) -> None: self.composition.down(destroy_volumes=True) with self.composition.override(Postgres()): self.composition.up("postgres") self._port = self.composition.default_port("postgres") def try_load_version(self) -> str: return POSTGRES_ENDPOINT_NAME def __str__(self) -> str: return "PostgresContainer"
Ancestors
Methods
def host(self) ‑> str
-
Expand source code Browse git
def host(self) -> str: return "localhost"
def password(self) ‑> str
-
Expand source code Browse git
def password(self) -> str: return "postgres"
def port(self) ‑> int
-
Expand source code Browse git
def port(self) -> int: assert self._port is not None return self._port
def up(self) ‑> None
-
Expand source code Browse git
def up(self) -> None: self.composition.down(destroy_volumes=True) with self.composition.override(Postgres()): self.composition.up("postgres") self._port = self.composition.default_port("postgres")
def user(self) ‑> str
-
Expand source code Browse git
def user(self) -> str: return "postgres"
Inherited members