Module materialize.util

Various utilities

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""Various utilities"""

from __future__ import annotations

import filecmp
import hashlib
import json
import os
import pathlib
import random
import subprocess
from collections.abc import Iterator
from enum import Enum
from pathlib import Path
from threading import Thread
from typing import Protocol, TypeVar

import xxhash
import zstandard

MZ_ROOT = Path(os.environ["MZ_ROOT"])


def nonce(digits: int) -> str:
    return "".join(random.choice("0123456789abcdef") for _ in range(digits))


T = TypeVar("T")


def all_subclasses(cls: type[T]) -> set[type[T]]:
    """Returns a recursive set of all subclasses of a class"""
    sc = cls.__subclasses__()
    return set(sc).union([subclass for c in sc for subclass in all_subclasses(c)])


NAUGHTY_STRINGS = None


def naughty_strings() -> list[str]:
    # Naughty strings taken from https://github.com/minimaxir/big-list-of-naughty-strings
    # Under MIT license, Copyright (c) 2015-2020 Max Woolf
    global NAUGHTY_STRINGS
    if not NAUGHTY_STRINGS:
        with open(MZ_ROOT / "misc" / "python" / "materialize" / "blns.json") as f:
            NAUGHTY_STRINGS = json.load(f)
    return NAUGHTY_STRINGS


class YesNoOnce(Enum):
    YES = 1
    NO = 2
    ONCE = 3


class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            self.ret = self._target(*self._args, **self._kwargs)  # type: ignore
        except BaseException as e:
            self.exc = e

    def join(self, timeout=None):
        super().join(timeout)
        if self.exc:
            raise self.exc
        if hasattr(self, "ret"):
            return self.ret


def decompress_zst_to_directory(
    zst_file_path: str, destination_dir_path: str
) -> list[str]:
    """
    :return: file paths in destination dir
    """
    input_file = pathlib.Path(zst_file_path)
    output_paths = []

    with open(input_file, "rb") as compressed:
        decompressor = zstandard.ZstdDecompressor()
        output_path = pathlib.Path(destination_dir_path) / input_file.stem
        output_paths.append(str(output_path))
        with open(output_path, "wb") as destination:
            decompressor.copy_stream(compressed, destination)

    return output_paths


def ensure_dir_exists(path_to_dir: str) -> None:
    subprocess.run(
        [
            "mkdir",
            "-p",
            f"{path_to_dir}",
        ],
        check=True,
    )


def sha256_of_file(path: str | Path) -> str:
    sha256 = hashlib.sha256()
    with open(path, "rb") as f:
        for block in iter(lambda: f.read(filecmp.BUFSIZE), b""):
            sha256.update(block)
    return sha256.hexdigest()


def sha256_of_utf8_string(value: str) -> str:
    return hashlib.sha256(bytes(value, encoding="utf-8")).hexdigest()


def stable_int_hash(*values: str) -> int:
    if len(values) == 1:
        return xxhash.xxh64(values[0], seed=0).intdigest()

    return stable_int_hash(",".join([str(stable_int_hash(entry)) for entry in values]))


class HasName(Protocol):
    name: str


U = TypeVar("U", bound=HasName)


def selected_by_name(selected: list[str], objs: list[U]) -> Iterator[U]:
    for name in selected:
        for obj in objs:
            if obj.name == name:
                yield obj
                break
        else:
            raise ValueError(
                f"Unknown object with name {name} in {[obj.name for obj in objs]}"
            )

Functions

def all_subclasses(cls: type[T]) ‑> set[type[~T]]

Returns a recursive set of all subclasses of a class

Expand source code Browse git
def all_subclasses(cls: type[T]) -> set[type[T]]:
    """Returns a recursive set of all subclasses of a class"""
    sc = cls.__subclasses__()
    return set(sc).union([subclass for c in sc for subclass in all_subclasses(c)])
def decompress_zst_to_directory(zst_file_path: str, destination_dir_path: str) ‑> list[str]

:return: file paths in destination dir

Expand source code Browse git
def decompress_zst_to_directory(
    zst_file_path: str, destination_dir_path: str
) -> list[str]:
    """
    :return: file paths in destination dir
    """
    input_file = pathlib.Path(zst_file_path)
    output_paths = []

    with open(input_file, "rb") as compressed:
        decompressor = zstandard.ZstdDecompressor()
        output_path = pathlib.Path(destination_dir_path) / input_file.stem
        output_paths.append(str(output_path))
        with open(output_path, "wb") as destination:
            decompressor.copy_stream(compressed, destination)

    return output_paths
def ensure_dir_exists(path_to_dir: str) ‑> None
Expand source code Browse git
def ensure_dir_exists(path_to_dir: str) -> None:
    subprocess.run(
        [
            "mkdir",
            "-p",
            f"{path_to_dir}",
        ],
        check=True,
    )
def naughty_strings() ‑> list[str]
Expand source code Browse git
def naughty_strings() -> list[str]:
    # Naughty strings taken from https://github.com/minimaxir/big-list-of-naughty-strings
    # Under MIT license, Copyright (c) 2015-2020 Max Woolf
    global NAUGHTY_STRINGS
    if not NAUGHTY_STRINGS:
        with open(MZ_ROOT / "misc" / "python" / "materialize" / "blns.json") as f:
            NAUGHTY_STRINGS = json.load(f)
    return NAUGHTY_STRINGS
def nonce(digits: int) ‑> str
Expand source code Browse git
def nonce(digits: int) -> str:
    return "".join(random.choice("0123456789abcdef") for _ in range(digits))
def selected_by_name(selected: list[str], objs: list[U]) ‑> collections.abc.Iterator[~U]
Expand source code Browse git
def selected_by_name(selected: list[str], objs: list[U]) -> Iterator[U]:
    for name in selected:
        for obj in objs:
            if obj.name == name:
                yield obj
                break
        else:
            raise ValueError(
                f"Unknown object with name {name} in {[obj.name for obj in objs]}"
            )
def sha256_of_file(path: str | Path) ‑> str
Expand source code Browse git
def sha256_of_file(path: str | Path) -> str:
    sha256 = hashlib.sha256()
    with open(path, "rb") as f:
        for block in iter(lambda: f.read(filecmp.BUFSIZE), b""):
            sha256.update(block)
    return sha256.hexdigest()
def sha256_of_utf8_string(value: str) ‑> str
Expand source code Browse git
def sha256_of_utf8_string(value: str) -> str:
    return hashlib.sha256(bytes(value, encoding="utf-8")).hexdigest()
def stable_int_hash(*values: str) ‑> int
Expand source code Browse git
def stable_int_hash(*values: str) -> int:
    if len(values) == 1:
        return xxhash.xxh64(values[0], seed=0).intdigest()

    return stable_int_hash(",".join([str(stable_int_hash(entry)) for entry in values]))

Classes

class HasName (*args, **kwargs)

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...
Expand source code Browse git
class HasName(Protocol):
    name: str

Ancestors

  • typing.Protocol
  • typing.Generic

Class variables

var name : str
class PropagatingThread (group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.

Expand source code Browse git
class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            self.ret = self._target(*self._args, **self._kwargs)  # type: ignore
        except BaseException as e:
            self.exc = e

    def join(self, timeout=None):
        super().join(timeout)
        if self.exc:
            raise self.exc
        if hasattr(self, "ret"):
            return self.ret

Ancestors

  • threading.Thread

Methods

def join(self, timeout=None)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

Expand source code Browse git
def join(self, timeout=None):
    super().join(timeout)
    if self.exc:
        raise self.exc
    if hasattr(self, "ret"):
        return self.ret
def run(self)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Expand source code Browse git
def run(self):
    self.exc = None
    try:
        self.ret = self._target(*self._args, **self._kwargs)  # type: ignore
    except BaseException as e:
        self.exc = e
class YesNoOnce (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code Browse git
class YesNoOnce(Enum):
    YES = 1
    NO = 2
    ONCE = 3

Ancestors

  • enum.Enum

Class variables

var NO
var ONCE
var YES