Module materialize.buildkite_insights.cache.generic_cache
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
from materialize import MZ_ROOT
from materialize.buildkite_insights.cache.cache_constants import (
FetchMode,
)
from materialize.buildkite_insights.util.data_io import (
FilePath,
exists_file,
exists_file_with_recent_data,
get_last_modification_date,
read_results_from_file,
write_results_to_file,
)
from materialize.util import ensure_dir_exists
PATH_TO_CACHE_DIR = MZ_ROOT / "temp"
@dataclass
class CacheFilePath(FilePath):
cache_item_type: str
pipeline_slug: str
params_hash: str
file_extension: str = "json"
def get_path_to_directory(self):
return f"{PATH_TO_CACHE_DIR}/{self.cache_item_type}"
def get(self) -> str:
return f"{self.get_path_to_directory()}/{self.pipeline_slug}-params-{self.params_hash}.{self.file_extension}"
def get_or_query_data(
cache_file_path: CacheFilePath,
fetch_action: Callable[[], Any],
fetch_mode: FetchMode,
max_allowed_cache_age_in_hours: int | None = 12,
add_to_cache_if_not_present: bool = True,
quiet_mode: bool = False,
) -> Any:
ensure_dir_exists(cache_file_path.get_path_to_directory())
no_fetch = fetch_mode == FetchMode.NEVER
if no_fetch and not exists_file(cache_file_path):
raise RuntimeError(f"File missing: {cache_file_path}")
if fetch_mode == FetchMode.AVOID and exists_file(cache_file_path):
no_fetch = True
elif fetch_mode == FetchMode.AUTO and exists_file_with_recent_data(
cache_file_path, max_allowed_cache_age_in_hours
):
no_fetch = True
if no_fetch:
if not quiet_mode:
last_modified_date = get_last_modification_date(cache_file_path)
print(
f"Using existing data: {cache_file_path} (downloaded: {last_modified_date})"
)
return read_results_from_file(cache_file_path, quiet_mode=quiet_mode)
fetched_data = fetch_action()
if add_to_cache_if_not_present:
write_results_to_file(fetched_data, cache_file_path, quiet_mode=quiet_mode)
return fetched_data
Functions
def get_or_query_data(cache_file_path: CacheFilePath, fetch_action: collections.abc.Callable[[], typing.Any], fetch_mode: FetchMode, max_allowed_cache_age_in_hours: int | None = 12, add_to_cache_if_not_present: bool = True, quiet_mode: bool = False) ‑> Any
-
Expand source code Browse git
def get_or_query_data( cache_file_path: CacheFilePath, fetch_action: Callable[[], Any], fetch_mode: FetchMode, max_allowed_cache_age_in_hours: int | None = 12, add_to_cache_if_not_present: bool = True, quiet_mode: bool = False, ) -> Any: ensure_dir_exists(cache_file_path.get_path_to_directory()) no_fetch = fetch_mode == FetchMode.NEVER if no_fetch and not exists_file(cache_file_path): raise RuntimeError(f"File missing: {cache_file_path}") if fetch_mode == FetchMode.AVOID and exists_file(cache_file_path): no_fetch = True elif fetch_mode == FetchMode.AUTO and exists_file_with_recent_data( cache_file_path, max_allowed_cache_age_in_hours ): no_fetch = True if no_fetch: if not quiet_mode: last_modified_date = get_last_modification_date(cache_file_path) print( f"Using existing data: {cache_file_path} (downloaded: {last_modified_date})" ) return read_results_from_file(cache_file_path, quiet_mode=quiet_mode) fetched_data = fetch_action() if add_to_cache_if_not_present: write_results_to_file(fetched_data, cache_file_path, quiet_mode=quiet_mode) return fetched_data
Classes
class CacheFilePath (cache_item_type: str, pipeline_slug: str, params_hash: str, file_extension: str = 'json')
-
CacheFilePath(cache_item_type: str, pipeline_slug: str, params_hash: str, file_extension: str = 'json')
Expand source code Browse git
@dataclass class CacheFilePath(FilePath): cache_item_type: str pipeline_slug: str params_hash: str file_extension: str = "json" def get_path_to_directory(self): return f"{PATH_TO_CACHE_DIR}/{self.cache_item_type}" def get(self) -> str: return f"{self.get_path_to_directory()}/{self.pipeline_slug}-params-{self.params_hash}.{self.file_extension}"
Ancestors
Class variables
var cache_item_type : str
var file_extension : str
var params_hash : str
var pipeline_slug : str
Methods
def get(self) ‑> str
-
Expand source code Browse git
def get(self) -> str: return f"{self.get_path_to_directory()}/{self.pipeline_slug}-params-{self.params_hash}.{self.file_extension}"
def get_path_to_directory(self)
-
Expand source code Browse git
def get_path_to_directory(self): return f"{PATH_TO_CACHE_DIR}/{self.cache_item_type}"