misc.python.materialize.build_config
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10import os 11from pathlib import Path 12from textwrap import dedent 13from typing import Any 14 15import toml 16 17 18class LocalState: 19 """Local state persisted by a tool. 20 21 Users should not expect this state to be durable, it can be blown away at 22 at point. 23 24 Stored at: ~/.cache/materialize/build_state.toml 25 """ 26 27 def __init__(self, path: Path): 28 self.path = path 29 if path.is_file(): 30 with open(path) as f: 31 self.data = toml.load(f) 32 else: 33 self.data = {} 34 35 @staticmethod 36 def default_path() -> Path: 37 home = Path.home() 38 path = home / ".cache" / "materialize" / "build_state.toml" 39 return path 40 41 @classmethod 42 def read(cls, namespace: str) -> Any | None: 43 cache = LocalState(LocalState.default_path()) 44 return cache.data.get(namespace, None) 45 46 @classmethod 47 def write(cls, namespace: str, val: Any): 48 cache = LocalState(LocalState.default_path()) 49 cache.data[namespace] = val 50 51 Path(os.path.dirname(cache.path)).mkdir(parents=True, exist_ok=True) 52 with open(cache.path, "w+") as f: 53 toml.dump(cache.data, f) 54 55 56class TeleportLocalState: 57 def __init__(self, data: dict[str, Any] | None): 58 self.data = data or {} 59 60 @classmethod 61 def read(cls): 62 return TeleportLocalState(LocalState.read("teleport")) 63 64 def write(self): 65 LocalState.write("teleport", self.data) 66 67 def get_pid(self, app_name: str) -> str | None: 68 existing = self.data.get(app_name, {}) 69 return existing.get("pid") 70 71 def set_pid(self, app_name: str, pid: str | None): 72 existing = self.data.get(app_name, {}) 73 existing["pid"] = pid 74 self.data[app_name] = existing 75 76 def get_address(self, app_name: str) -> str | None: 77 existing = self.data.get(app_name, {}) 78 return existing.get("address") 79 80 def set_address(self, app_name: str, addr: str | None): 81 existing = self.data.get(app_name, {}) 82 existing["address"] = addr 83 self.data[app_name] = existing 84 85 def __str__(self): 86 return dedent( 87 f""" 88 TeleportLocalState: 89 data: {self.data} 90 """ 91 )
class
LocalState:
19class LocalState: 20 """Local state persisted by a tool. 21 22 Users should not expect this state to be durable, it can be blown away at 23 at point. 24 25 Stored at: ~/.cache/materialize/build_state.toml 26 """ 27 28 def __init__(self, path: Path): 29 self.path = path 30 if path.is_file(): 31 with open(path) as f: 32 self.data = toml.load(f) 33 else: 34 self.data = {} 35 36 @staticmethod 37 def default_path() -> Path: 38 home = Path.home() 39 path = home / ".cache" / "materialize" / "build_state.toml" 40 return path 41 42 @classmethod 43 def read(cls, namespace: str) -> Any | None: 44 cache = LocalState(LocalState.default_path()) 45 return cache.data.get(namespace, None) 46 47 @classmethod 48 def write(cls, namespace: str, val: Any): 49 cache = LocalState(LocalState.default_path()) 50 cache.data[namespace] = val 51 52 Path(os.path.dirname(cache.path)).mkdir(parents=True, exist_ok=True) 53 with open(cache.path, "w+") as f: 54 toml.dump(cache.data, f)
Local state persisted by a tool.
Users should not expect this state to be durable, it can be blown away at at point.
Stored at: ~/.cache/materialize/build_state.toml
class
TeleportLocalState:
57class TeleportLocalState: 58 def __init__(self, data: dict[str, Any] | None): 59 self.data = data or {} 60 61 @classmethod 62 def read(cls): 63 return TeleportLocalState(LocalState.read("teleport")) 64 65 def write(self): 66 LocalState.write("teleport", self.data) 67 68 def get_pid(self, app_name: str) -> str | None: 69 existing = self.data.get(app_name, {}) 70 return existing.get("pid") 71 72 def set_pid(self, app_name: str, pid: str | None): 73 existing = self.data.get(app_name, {}) 74 existing["pid"] = pid 75 self.data[app_name] = existing 76 77 def get_address(self, app_name: str) -> str | None: 78 existing = self.data.get(app_name, {}) 79 return existing.get("address") 80 81 def set_address(self, app_name: str, addr: str | None): 82 existing = self.data.get(app_name, {}) 83 existing["address"] = addr 84 self.data[app_name] = existing 85 86 def __str__(self): 87 return dedent( 88 f""" 89 TeleportLocalState: 90 data: {self.data} 91 """ 92 )