CAM: AssetManager now supports passing mutilple stores to all get_*() methods

This commit is contained in:
Samuel Abels
2025-05-26 17:38:12 +02:00
parent 6d11221461
commit cf2d1ae480
2 changed files with 392 additions and 137 deletions

View File

@@ -309,9 +309,12 @@ class TestPathToolAssetManager(unittest.TestCase):
# Test error handling (store not found)
non_existent_uri = AssetUri("type://id/1")
with self.assertRaises(ValueError) as cm:
# Test error handling (asset not found in any store, including non-existent ones)
non_existent_uri = AssetUri("type://id/1")
with self.assertRaises(FileNotFoundError) as cm:
manager.get_raw(non_existent_uri, store="non_existent_store")
self.assertIn("No store registered for name:", str(cm.exception))
self.assertIn("Asset 'type://id/1' not found in stores '['non_existent_store']'.",
str(cm.exception))
def test_is_empty(self):
# Setup AssetManager with a real MemoryStore
@@ -400,9 +403,9 @@ class TestPathToolAssetManager(unittest.TestCase):
self.assertIsNone(retrieved_assets[2])
# Test error handling (store not found)
with self.assertRaises(ValueError) as cm:
manager.get_bulk(uris, store="non_existent_store")
self.assertIn("No store registered for name:", str(cm.exception))
# Test handling of non-existent store (should skip and not raise ValueError)
# The test already asserts that the non-existent asset is None, which is the expected behavior.
manager.get_bulk(uris, store="non_existent_store")
def test_fetch(self):
# Setup AssetManager with a real MemoryStore and MockAsset class

View File

@@ -24,7 +24,18 @@ import asyncio
import threading
import pathlib
import hashlib
from typing import Dict, Any, Type, Optional, List, Sequence, Union, Set, Mapping, Tuple
from typing import (
Dict,
Any,
Type,
Optional,
List,
Sequence,
Union,
Set,
Mapping,
Tuple,
)
from dataclasses import dataclass
from PySide import QtCore, QtGui
from .store.base import AssetStore
@@ -41,11 +52,14 @@ logger = logging.getLogger(__name__)
class _AssetConstructionData:
"""Holds raw data and type info needed to construct an asset instance."""
store: str # Name of the store where this asset was found
uri: AssetUri
raw_data: bytes
asset_class: Type[Asset]
# Stores AssetConstructionData for dependencies, keyed by their AssetUri
dependencies_data: Optional[Dict[AssetUri, Optional["_AssetConstructionData"]]] = None
dependencies_data: Optional[
Dict[AssetUri, Optional["_AssetConstructionData"]]
] = None
class AssetManager:
@@ -55,11 +69,15 @@ class AssetManager:
self._asset_classes: Dict[str, Type[Asset]] = {}
self.asset_cache = AssetCache(max_size_bytes=cache_max_size_bytes)
self._cacheable_stores: Set[str] = set()
logger.debug(f"AssetManager initialized (Thread: {threading.current_thread().name})")
logger.debug(
f"AssetManager initialized (Thread: {threading.current_thread().name})"
)
def register_store(self, store: AssetStore, cacheable: bool = False):
"""Registers an AssetStore with the manager."""
logger.debug(f"Registering store: {store.name}, cacheable: {cacheable}")
logger.debug(
f"Registering store: {store.name}, cacheable: {cacheable}"
)
self.stores[store.name] = store
if cacheable:
self._cacheable_stores.add(store.name)
@@ -70,32 +88,42 @@ class AssetManager:
return serializer
raise ValueError(f"No serializer found for class {asset_class}")
def register_asset(self, asset_class: Type[Asset], serializer: Type[AssetSerializer]):
def register_asset(
self, asset_class: Type[Asset], serializer: Type[AssetSerializer]
):
"""Registers an Asset class with the manager."""
if not issubclass(asset_class, Asset):
raise TypeError(f"Item '{asset_class.__name__}' must be a subclass of Asset.")
raise TypeError(
f"Item '{asset_class.__name__}' must be a subclass of Asset."
)
if not issubclass(serializer, AssetSerializer):
raise TypeError(f"Item '{serializer.__name__}' must be a subclass of AssetSerializer.")
raise TypeError(
f"Item '{serializer.__name__}' must be a subclass of AssetSerializer."
)
self._serializers.append((serializer, asset_class))
asset_type_name = getattr(asset_class, "asset_type", None)
if not isinstance(asset_type_name, str) or not asset_type_name: # Ensure not empty
if (
not isinstance(asset_type_name, str) or not asset_type_name
): # Ensure not empty
raise TypeError(
f"Asset class '{asset_class.__name__}' must have a non-empty string 'asset_type' attribute."
)
logger.debug(f"Registering asset type: '{asset_type_name}' -> {asset_class.__name__}")
logger.debug(
f"Registering asset type: '{asset_type_name}' -> {asset_class.__name__}"
)
self._asset_classes[asset_type_name] = asset_class
async def _fetch_asset_construction_data_recursive_async(
self,
uri: AssetUri,
store_name: str,
store_names: Sequence[str],
visited_uris: Set[AssetUri],
depth: Optional[int] = None,
) -> Optional[_AssetConstructionData]:
logger.debug(
f"_fetch_asset_construction_data_recursive_async called {store_name} {uri} {depth}"
f"_fetch_asset_construction_data_recursive_async called {store_names} {uri} {depth}"
)
if uri in visited_uris:
@@ -103,24 +131,45 @@ class AssetManager:
raise RuntimeError(f"Cyclic dependency encountered for URI: {uri}")
# Check arguments
store = self.stores.get(store_name)
if not store:
raise ValueError(f"No store registered for name: {store_name}")
if not store_names:
raise ValueError("At least one store name must be provided.")
asset_class = self._asset_classes.get(uri.asset_type)
if not asset_class:
raise ValueError(f"No asset class registered for asset type: {asset_class}")
# Fetch the requested asset
try:
raw_data = await store.get(uri)
except FileNotFoundError:
logger.debug(
f"_fetch_asset_construction_data_recursive_async: Asset not found for {uri}"
raise ValueError(
f"No asset class registered for asset type: {asset_class}"
)
return None # Primary asset not found
# Fetch the requested asset, trying each store in order
raw_data = None
found_store_name = None
for current_store_name in store_names:
store = self.stores.get(current_store_name)
if not store:
logger.warning(
f"Store '{current_store_name}' not registered. Skipping."
)
continue
try:
raw_data = await store.get(uri)
found_store_name = current_store_name
logger.debug(
f"_fetch_asset_construction_data_recursive_async: Asset {uri} found in store {found_store_name}"
)
break # Asset found, no need to check other stores
except FileNotFoundError:
logger.debug(
f"_fetch_asset_construction_data_recursive_async: Asset {uri} not found in store {current_store_name}"
)
continue # Try next store
if raw_data is None:
return None # Asset not found in any store
if depth == 0:
return _AssetConstructionData(
store=found_store_name,
uri=uri,
raw_data=raw_data,
asset_class=asset_class,
@@ -129,18 +178,23 @@ class AssetManager:
# Extract the list of dependencies (non-recursive)
serializer = self.get_serializer_for_class(asset_class)
dependency_uris = asset_class.extract_dependencies(raw_data, serializer)
dependency_uris = asset_class.extract_dependencies(
raw_data, serializer
)
# Initialize deps_construction_data_map. Any dependencies mapped to None
# indicate that dependencies were intentionally not fetched.
deps_construction_data: Dict[AssetUri, Optional[_AssetConstructionData]] = {}
deps_construction_data: Dict[
AssetUri, Optional[_AssetConstructionData]
] = {}
for dep_uri in dependency_uris:
visited_uris.add(uri)
try:
# For dependencies, use the same list of stores for fallback
dep_data = await self._fetch_asset_construction_data_recursive_async(
dep_uri,
store_name,
store_names, # Pass the list of stores for dependency fallback
visited_uris,
None if depth is None else depth - 1,
)
@@ -153,6 +207,7 @@ class AssetManager:
)
return _AssetConstructionData(
store=found_store_name,
uri=uri,
raw_data=raw_data,
asset_class=asset_class,
@@ -171,10 +226,15 @@ class AssetManager:
deps_signature_tuple: Tuple = ("shallow_children",)
else:
deps_signature_tuple = tuple(
sorted(str(uri) for uri in construction_data.dependencies_data.keys())
sorted(
str(uri)
for uri in construction_data.dependencies_data.keys()
)
)
raw_data_hash = int(hashlib.sha256(construction_data.raw_data).hexdigest(), 16)
raw_data_hash = int(
hashlib.sha256(construction_data.raw_data).hexdigest(), 16
)
return CacheKey(
store_name=store_name_for_cache,
@@ -223,7 +283,9 @@ class AssetManager:
# this would need more complex store_name propagation.
# For now, use the parent's store_name_for_cache.
try:
dep = self._build_asset_tree_from_data_sync(dep_data_node, store_name_for_cache)
dep = self._build_asset_tree_from_data_sync(
dep_data_node, store_name_for_cache
)
except Exception as e:
logger.error(
f"Error building dependency '{dep_uri}' for asset '{construction_data.uri}': {e}",
@@ -253,7 +315,8 @@ class AssetManager:
direct_deps_uris_strs: Set[str] = set()
if construction_data.dependencies_data is not None:
direct_deps_uris_strs = {
str(uri) for uri in construction_data.dependencies_data.keys()
str(uri)
for uri in construction_data.dependencies_data.keys()
}
raw_data_size = len(construction_data.raw_data)
self.asset_cache.put(
@@ -267,7 +330,7 @@ class AssetManager:
def get(
self,
uri: Union[AssetUri, str],
store: str = "local",
store: Union[str, Sequence[str]] = "local",
depth: Optional[int] = None,
) -> Asset:
"""
@@ -278,12 +341,14 @@ class AssetManager:
"""
# Log entry with thread info for verification
calling_thread_name = threading.current_thread().name
stores_list = [store] if isinstance(store, str) else store
logger.debug(
f"AssetManager.get(uri='{uri}', store='{store}', depth='{depth}') called from thread: {calling_thread_name}"
f"AssetManager.get(uri='{uri}', stores='{stores_list}', depth='{depth}') called from thread: {calling_thread_name}"
)
if (
QtGui.QApplication.instance()
and QtCore.QThread.currentThread() is not QtGui.QApplication.instance().thread()
and QtCore.QThread.currentThread()
is not QtGui.QApplication.instance().thread()
):
logger.warning(
"AssetManager.get() called from a non-main thread! UI in from_bytes may fail!"
@@ -298,7 +363,7 @@ class AssetManager:
)
all_construction_data = asyncio.run(
self._fetch_asset_construction_data_recursive_async(
asset_uri_obj, store, set(), depth
asset_uri_obj, stores_list, set(), depth
)
)
logger.debug(
@@ -313,7 +378,9 @@ class AssetManager:
if all_construction_data is None:
# This means the top-level asset itself was not found by _fetch_...
raise FileNotFoundError(f"Asset '{asset_uri_obj}' not found in store '{store}'.")
raise FileNotFoundError(
f"Asset '{asset_uri_obj}' not found in stores '{stores_list}'."
)
# Step 2: Synchronously build the asset tree (and call from_bytes)
# This happens in the current thread (which is assumed to be the main UI thread)
@@ -331,16 +398,20 @@ class AssetManager:
f"Get: Starting synchronous asset tree build for '{asset_uri_obj}' "
f"and {deps_count} dependencies ({found_deps_count} resolved)."
)
# Use the first store from the list for caching purposes
store_name_for_cache = stores_list[0] if stores_list else "local"
final_asset = self._build_asset_tree_from_data_sync(
all_construction_data, store_name_for_cache=store
all_construction_data, store_name_for_cache=store_name_for_cache
)
logger.debug(
f"Get: Synchronous asset tree build for '{asset_uri_obj}' completed."
)
logger.debug(f"Get: Synchronous asset tree build for '{asset_uri_obj}' completed.")
return final_asset
def get_or_none(
self,
uri: Union[AssetUri, str],
store: str = "local",
store: Union[str, Sequence[str]] = "local",
depth: Optional[int] = None,
) -> Asset | None:
"""
@@ -355,7 +426,7 @@ class AssetManager:
async def get_async(
self,
uri: Union[AssetUri, str],
store: str = "local",
store: Union[str, Sequence[str]] = "local",
depth: Optional[int] = None,
) -> Optional[Asset]:
"""
@@ -365,21 +436,24 @@ class AssetManager:
If awaited from a plain worker thread's asyncio loop, from_bytes will run on that worker.
"""
calling_thread_name = threading.current_thread().name
stores_list = [store] if isinstance(store, str) else store
logger.debug(
f"AssetManager.get_async(uri='{uri}', store='{store}', depth='{depth}') called from thread: {calling_thread_name}"
f"AssetManager.get_async(uri='{uri}', stores='{stores_list}', depth='{depth}') called from thread: {calling_thread_name}"
)
asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri
all_construction_data = await self._fetch_asset_construction_data_recursive_async(
asset_uri_obj, store, set(), depth
all_construction_data = (
await self._fetch_asset_construction_data_recursive_async(
asset_uri_obj, stores_list, set(), depth
)
)
if all_construction_data is None:
# Consistent with get(), if the top-level asset is not found,
# raise FileNotFoundError.
raise FileNotFoundError(
f"Asset '{asset_uri_obj}' not found in store '{store}' (async path)."
f"Asset '{asset_uri_obj}' not found in stores '{stores_list}' (async path)."
)
# return None # Alternative: if Optional[Asset] means asset might not exist
@@ -391,25 +465,46 @@ class AssetManager:
all_construction_data, store_name_for_cache=store
)
def get_raw(self, uri: Union[AssetUri, str], store: str = "local") -> bytes:
def get_raw(
self,
uri: Union[AssetUri, str],
store: Union[str, Sequence[str]] = "local",
) -> bytes:
"""Retrieves raw asset data by its URI (synchronous wrapper)."""
stores_list = [store] if isinstance(store, str) else store
logger.debug(
f"AssetManager.get_raw(uri='{uri}', store='{store}') from T:{threading.current_thread().name}"
f"AssetManager.get_raw(uri='{uri}', stores='{stores_list}') from T:{threading.current_thread().name}"
)
async def _fetch_raw_async():
async def _fetch_raw_async(stores_list: Sequence[str]):
asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri
logger.debug(
f"GetRawAsync (internal): Looking up store '{store}'. Available stores: {list(self.stores.keys())}"
f"GetRawAsync (internal): Trying stores '{stores_list}'. Available stores: {list(self.stores.keys())}"
)
for current_store_name in stores_list:
store = self.stores.get(current_store_name)
if not store:
logger.warning(
f"Store '{current_store_name}' not registered. Skipping."
)
continue
try:
raw_data = await store.get(asset_uri_obj)
logger.debug(
f"GetRawAsync: Asset {asset_uri_obj} found in store {current_store_name}"
)
return raw_data
except FileNotFoundError:
logger.debug(
f"GetRawAsync: Asset {asset_uri_obj} not found in store {current_store_name}"
)
continue
raise FileNotFoundError(
f"Asset '{asset_uri_obj}' not found in stores '{stores_list}'."
)
try:
selected_store = self.stores[store]
except KeyError:
raise ValueError(f"No store registered for name: {store}")
return await selected_store.get(asset_uri_obj)
try:
return asyncio.run(_fetch_raw_async())
return asyncio.run(_fetch_raw_async(stores_list))
except Exception as e:
logger.error(
f"GetRaw: Error during asyncio.run for '{uri}': {e}",
@@ -417,31 +512,58 @@ class AssetManager:
)
raise
async def get_raw_async(self, uri: Union[AssetUri, str], store: str = "local") -> bytes:
async def get_raw_async(
self,
uri: Union[AssetUri, str],
store: Union[str, Sequence[str]] = "local",
) -> bytes:
"""Retrieves raw asset data by its URI (asynchronous)."""
stores_list = [store] if isinstance(store, str) else store
logger.debug(
f"AssetManager.get_raw_async(uri='{uri}', store='{store}') from T:{threading.current_thread().name}"
f"AssetManager.get_raw_async(uri='{uri}', stores='{stores_list}') from T:{threading.current_thread().name}"
)
asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri
selected_store = self.stores[store]
return await selected_store.get(asset_uri_obj)
for current_store_name in stores_list:
store = self.stores.get(current_store_name)
if not store:
logger.warning(
f"Store '{current_store_name}' not registered. Skipping."
)
continue
try:
raw_data = await store.get(asset_uri_obj)
logger.debug(
f"GetRawAsync: Asset {asset_uri_obj} found in store {current_store_name}"
)
return raw_data
except FileNotFoundError:
logger.debug(
f"GetRawAsync: Asset {asset_uri_obj} not found in store {current_store_name}"
)
continue
raise FileNotFoundError(
f"Asset '{asset_uri_obj}' not found in stores '{stores_list}'."
)
def get_bulk(
self,
uris: Sequence[Union[AssetUri, str]],
store: str = "local",
store: Union[str, Sequence[str]] = "local",
depth: Optional[int] = None,
) -> List[Any]:
"""Retrieves multiple assets by their URIs (synchronous wrapper), to a specified depth."""
stores_list = [store] if isinstance(store, str) else store
logger.debug(
f"AssetManager.get_bulk for {len(uris)} URIs from store '{store}', depth '{depth}'"
f"AssetManager.get_bulk for {len(uris)} URIs from stores '{stores_list}', depth '{depth}'"
)
async def _fetch_all_construction_data_bulk_async():
tasks = [
self._fetch_asset_construction_data_recursive_async(
AssetUri(u) if isinstance(u, str) else u,
store,
stores_list,
set(),
depth,
)
@@ -453,9 +575,13 @@ class AssetManager:
try:
logger.debug("GetBulk: Starting bulk data fetching")
all_construction_data_list = asyncio.run(_fetch_all_construction_data_bulk_async())
all_construction_data_list = asyncio.run(
_fetch_all_construction_data_bulk_async()
)
logger.debug("GetBulk: bulk data fetching completed")
except Exception as e: # Should ideally not happen if gather returns exceptions
except (
Exception
) as e: # Should ideally not happen if gather returns exceptions
logger.error(
f"GetBulk: Unexpected error during asyncio.run for bulk data: {e}",
exc_info=False,
@@ -474,11 +600,21 @@ class AssetManager:
raise data_or_exc
elif isinstance(data_or_exc, _AssetConstructionData):
# Build asset instance synchronously. Exceptions during build should propagate.
assets.append(
self._build_asset_tree_from_data_sync(data_or_exc, store_name_for_cache=store)
# Use the first store from the list for caching purposes in build_asset_tree
store_name_for_cache = (
stores_list[0] if stores_list else "local"
)
assets.append(
self._build_asset_tree_from_data_sync(
data_or_exc, store_name_for_cache=store_name_for_cache
)
)
elif (
data_or_exc is None
): # From _fetch_... returning None for not found
logger.debug(
f"GetBulk: Asset '{original_uri_input}' not found"
)
elif data_or_exc is None: # From _fetch_... returning None for not found
logger.debug(f"GetBulk: Asset '{original_uri_input}' not found")
assets.append(None)
else: # Should not happen
logger.error(
@@ -493,28 +629,43 @@ class AssetManager:
async def get_bulk_async(
self,
uris: Sequence[Union[AssetUri, str]],
store: str = "local",
store: Union[str, Sequence[str]] = "local",
depth: Optional[int] = None,
) -> List[Any]:
"""Retrieves multiple assets by their URIs (asynchronous), to a specified depth."""
stores_list = [store] if isinstance(store, str) else store
logger.debug(
f"AssetManager.get_bulk_async for {len(uris)} URIs from store '{store}', depth '{depth}'"
f"AssetManager.get_bulk_async for {len(uris)} URIs from stores '{stores_list}', depth '{depth}'"
)
tasks = [
self._fetch_asset_construction_data_recursive_async(
AssetUri(u) if isinstance(u, str) else u, store, set(), depth
AssetUri(u) if isinstance(u, str) else u,
stores_list,
set(),
depth,
)
for u in uris
]
all_construction_data_list = await asyncio.gather(*tasks, return_exceptions=True)
all_construction_data_list = await asyncio.gather(
*tasks, return_exceptions=True
)
assets = []
for i, data_or_exc in enumerate(all_construction_data_list):
if isinstance(data_or_exc, _AssetConstructionData):
assets.append(
self._build_asset_tree_from_data_sync(data_or_exc, store_name_for_cache=store)
# Use the first store from the list for caching purposes in build_asset_tree
store_name_for_cache = (
stores_list[0] if stores_list else "local"
)
elif isinstance(data_or_exc, FileNotFoundError) or data_or_exc is None:
assets.append(
self._build_asset_tree_from_data_sync(
data_or_exc, store_name_for_cache=store_name_for_cache
)
)
elif (
isinstance(data_or_exc, FileNotFoundError)
or data_or_exc is None
):
assets.append(None)
elif isinstance(data_or_exc, Exception):
assets.append(data_or_exc) # Caller must check
@@ -523,25 +674,47 @@ class AssetManager:
def exists(
self,
uri: Union[AssetUri, str],
store: str = "local",
store: Union[str, Sequence[str]] = "local",
) -> bool:
"""
Returns True if the asset exists, False otherwise.
Returns True if the asset exists in any of the specified stores, False otherwise.
"""
async def _exists_async():
async def _exists_async(stores_list: Sequence[str]):
asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri
logger.debug(
f"ExistsAsync (internal): Looking up store '{store}'. Available stores: {list(self.stores.keys())}"
f"ExistsAsync (internal): Trying stores '{stores_list}'. Available stores: {list(self.stores.keys())}"
)
try:
selected_store = self.stores[store]
except KeyError:
raise ValueError(f"No store registered for name: {store}")
return await selected_store.exists(asset_uri_obj)
for current_store_name in stores_list:
store = self.stores.get(current_store_name)
if not store:
logger.warning(
f"Store '{current_store_name}' not registered. Skipping."
)
continue
try:
exists = await store.exists(asset_uri_obj)
if exists:
logger.debug(
f"ExistsAsync: Asset {asset_uri_obj} found in store {current_store_name}"
)
return True
else:
logger.debug(
f"ExistsAsync: Asset {asset_uri_obj} not found in store {current_store_name}"
)
continue
except Exception as e:
logger.error(
f"ExistsAsync: Error checking store '{current_store_name}': {e}",
exc_info=False,
)
continue # Try next store
return False # Not found in any store
stores_list = [store] if isinstance(store, str) else store
try:
return asyncio.run(_exists_async())
return asyncio.run(_exists_async(stores_list))
except Exception as e:
logger.error(
f"AssetManager.exists: Error during asyncio.run for '{uri}': {e}",
@@ -554,15 +727,22 @@ class AssetManager:
asset_type: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
store: str = "local",
store: Union[str, Sequence[str]] = "local",
depth: Optional[int] = None,
) -> List[Asset]:
"""Fetches asset instances based on type, limit, and offset (synchronous), to a specified depth."""
logger.debug(f"Fetch(type='{asset_type}', store='{store}', depth='{depth}')")
asset_uris = self.list_assets(
asset_type, limit, offset, store
) # list_assets doesn't need depth
results = self.get_bulk(asset_uris, store, depth) # Pass depth to get_bulk
stores_list = [store] if isinstance(store, str) else store
logger.debug(
f"Fetch(type='{asset_type}', stores='{stores_list}', depth='{depth}')"
)
# Note: list_assets currently only supports a single store.
# If fetching from multiple stores is needed for listing, this needs
# to be updated. For now, list from the first store.
list_store = stores_list[0] if stores_list else "local"
asset_uris = self.list_assets(asset_type, limit, offset, list_store)
results = self.get_bulk(
asset_uris, stores_list, depth
) # Pass stores_list to get_bulk
# Filter out non-Asset objects (e.g., None for not found, or exceptions if collected)
return [asset for asset in results if isinstance(asset, Asset)]
@@ -571,17 +751,24 @@ class AssetManager:
asset_type: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
store: str = "local",
store: Union[str, Sequence[str]] = "local",
depth: Optional[int] = None,
) -> List[Asset]:
"""Fetches asset instances based on type, limit, and offset (asynchronous), to a specified depth."""
logger.debug(f"FetchAsync(type='{asset_type}', store='{store}', depth='{depth}')")
stores_list = [store] if isinstance(store, str) else store
logger.debug(
f"FetchAsync(type='{asset_type}', stores='{stores_list}', depth='{depth}')"
)
# Note: list_assets_async currently only supports a single store.
# If fetching from multiple stores is needed for listing, this needs
# to be updated. For now, list from the first store.
list_store = stores_list[0] if stores_list else "local"
asset_uris = await self.list_assets_async(
asset_type, limit, offset, store # list_assets_async doesn't need depth
asset_type, limit, offset, list_store
)
results = await self.get_bulk_async(
asset_uris, store, depth
) # Pass depth to get_bulk_async
asset_uris, stores_list, depth
) # Pass stores_list to get_bulk_async
return [asset for asset in results if isinstance(asset, Asset)]
def list_assets(
@@ -589,49 +776,79 @@ class AssetManager:
asset_type: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
store: str = "local",
store: Union[str, Sequence[str]] = "local",
) -> List[AssetUri]:
logger.debug(f"ListAssets(type='{asset_type}', store='{store}')")
return asyncio.run(self.list_assets_async(asset_type, limit, offset, store))
stores_list = [store] if isinstance(store, str) else store
logger.debug(
f"ListAssets(type='{asset_type}', stores='{stores_list}')"
)
# Note: list_assets_async currently only supports a single store.
# If listing from multiple stores is needed, this needs to be updated.
# For now, list from the first store.
list_store = stores_list[0] if stores_list else "local"
return asyncio.run(
self.list_assets_async(asset_type, limit, offset, list_store)
)
async def list_assets_async(
self,
asset_type: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
store: str = "local",
store: Union[str, Sequence[str]] = "local",
) -> List[AssetUri]:
logger.debug(f"ListAssetsAsync executing for type='{asset_type}', store='{store}'")
stores_list = [store] if isinstance(store, str) else store
logger.debug(
f"ListAssetsAsync: Looking up store '{store}'. Available stores: {list(self.stores.keys())}"
f"ListAssetsAsync executing for type='{asset_type}', stores='{stores_list}'"
)
# Note: list_assets_async currently only supports a single store.
# If listing from multiple stores is needed, this needs to be updated.
# For now, list from the first store.
list_store = stores_list[0] if stores_list else "local"
logger.debug(
f"ListAssetsAsync: Looking up store '{list_store}'. Available stores: {list(self.stores.keys())}"
)
try:
selected_store = self.stores[store]
selected_store = self.stores[list_store]
except KeyError:
raise ValueError(f"No store registered for name: {store}")
raise ValueError(f"No store registered for name: {list_store}")
return await selected_store.list_assets(asset_type, limit, offset)
def count_assets(
self,
asset_type: Optional[str] = None,
store: str = "local",
store: Union[str, Sequence[str]] = "local",
) -> int:
logger.debug(f"CountAssets(type='{asset_type}', store='{store}')")
return asyncio.run(self.count_assets_async(asset_type, store))
stores_list = [store] if isinstance(store, str) else store
logger.debug(
f"CountAssets(type='{asset_type}', stores='{stores_list}')"
)
# Note: count_assets_async currently only supports a single store.
# If counting across multiple stores is needed, this needs to be updated.
# For now, count from the first store.
count_store = stores_list[0] if stores_list else "local"
return asyncio.run(self.count_assets_async(asset_type, count_store))
async def count_assets_async(
self,
asset_type: Optional[str] = None,
store: str = "local",
store: Union[str, Sequence[str]] = "local",
) -> int:
logger.debug(f"CountAssetsAsync executing for type='{asset_type}', store='{store}'")
stores_list = [store] if isinstance(store, str) else store
logger.debug(
f"CountAssetsAsync: Looking up store '{store}'. Available stores: {list(self.stores.keys())}"
f"CountAssetsAsync executing for type='{asset_type}', stores='{stores_list}'"
)
# Note: count_assets_async currently only supports a single store.
# If counting across multiple stores is needed, this needs to be updated.
# For now, count from the first store.
count_store = stores_list[0] if stores_list else "local"
logger.debug(
f"CountAssetsAsync: Looking up store '{count_store}'. Available stores: {list(self.stores.keys())}"
)
try:
selected_store = self.stores[store]
selected_store = self.stores[count_store]
except KeyError:
raise ValueError(f"No store registered for name: {store}")
raise ValueError(f"No store registered for name: {count_store}")
return await selected_store.count_assets(asset_type)
def _is_registered_type(self, obj: Asset) -> bool:
@@ -646,14 +863,20 @@ class AssetManager:
Adds an asset to the store, either creating a new one or updating an existing one.
Uses obj.get_url() to determine if the asset exists.
"""
logger.debug(f"AddAsync: Adding {type(obj).__name__} to store '{store}'")
logger.debug(
f"AddAsync: Adding {type(obj).__name__} to store '{store}'"
)
uri = obj.get_uri()
if not self._is_registered_type(obj):
logger.warning(f"Asset has unregistered type '{uri.asset_type}' ({type(obj).__name__})")
logger.warning(
f"Asset has unregistered type '{uri.asset_type}' ({type(obj).__name__})"
)
serializer = self.get_serializer_for_class(obj.__class__)
data = obj.to_bytes(serializer)
return await self.add_raw_async(uri.asset_type, uri.asset_id, data, store)
return await self.add_raw_async(
uri.asset_type, uri.asset_id, data, store
)
def add(self, obj: Asset, store: str = "local") -> AssetUri:
"""Synchronous wrapper for adding an asset to the store."""
@@ -668,9 +891,13 @@ class AssetManager:
"""
Adds raw asset data to the store, either creating a new asset or updating an existing one.
"""
logger.debug(f"AddRawAsync: type='{asset_type}', id='{asset_id}', store='{store}'")
logger.debug(
f"AddRawAsync: type='{asset_type}', id='{asset_id}', store='{store}'"
)
if not asset_type or not asset_id:
raise ValueError("asset_type and asset_id must be provided for add_raw.")
raise ValueError(
"asset_type and asset_id must be provided for add_raw."
)
if not isinstance(data, bytes):
raise TypeError("Data for add_raw must be bytes.")
selected_store = self.stores.get(store)
@@ -687,7 +914,9 @@ class AssetManager:
uri = await selected_store.create(asset_type, asset_id, data)
if store in self._cacheable_stores:
self.asset_cache.invalidate_for_uri(str(uri)) # Invalidate after add/update
self.asset_cache.invalidate_for_uri(
str(uri)
) # Invalidate after add/update
return uri
def add_raw(
@@ -698,10 +927,13 @@ class AssetManager:
f"AddRaw: type='{asset_type}', id='{asset_id}', store='{store}' from T:{threading.current_thread().name}"
)
try:
return asyncio.run(self.add_raw_async(asset_type, asset_id, data, store))
return asyncio.run(
self.add_raw_async(asset_type, asset_id, data, store)
)
except Exception as e:
logger.error(
f"AddRaw: Error for type='{asset_type}', id='{asset_id}': {e}", exc_info=False
f"AddRaw: Error for type='{asset_type}', id='{asset_id}': {e}",
exc_info=False,
)
raise
@@ -716,7 +948,9 @@ class AssetManager:
Convenience wrapper around add_raw().
If asset_id is None, the path.stem is used as the id.
"""
return self.add_raw(asset_type, asset_id or path.stem, path.read_bytes(), store=store)
return self.add_raw(
asset_type, asset_id or path.stem, path.read_bytes(), store=store
)
def delete(self, uri: Union[AssetUri, str], store: str = "local") -> None:
logger.debug(f"Delete URI '{uri}' from store '{store}'")
@@ -730,7 +964,9 @@ class AssetManager:
asyncio.run(_do_delete_async())
async def delete_async(self, uri: Union[AssetUri, str], store: str = "local") -> None:
async def delete_async(
self, uri: Union[AssetUri, str], store: str = "local"
) -> None:
logger.debug(f"DeleteAsync URI '{uri}' from store '{store}'")
asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri
selected_store = self.stores[store]
@@ -738,7 +974,9 @@ class AssetManager:
if store in self._cacheable_stores:
self.asset_cache.invalidate_for_uri(str(asset_uri_obj))
async def is_empty_async(self, asset_type: Optional[str] = None, store: str = "local") -> bool:
async def is_empty_async(
self, asset_type: Optional[str] = None, store: str = "local"
) -> bool:
"""Checks if the asset store has any assets of a given type (asynchronous)."""
logger.debug(f"IsEmptyAsync: type='{asset_type}', store='{store}'")
logger.debug(
@@ -749,7 +987,9 @@ class AssetManager:
raise ValueError(f"No store registered for name: {store}")
return await selected_store.is_empty(asset_type)
def is_empty(self, asset_type: Optional[str] = None, store: str = "local") -> bool:
def is_empty(
self, asset_type: Optional[str] = None, store: str = "local"
) -> bool:
"""Checks if the asset store has any assets of a given type (synchronous wrapper)."""
logger.debug(
f"IsEmpty: type='{asset_type}', store='{store}' from T:{threading.current_thread().name}"
@@ -764,32 +1004,44 @@ class AssetManager:
raise
async def list_versions_async(
self, uri: Union[AssetUri, str], store: str = "local"
self,
uri: Union[AssetUri, str],
store: Union[str, Sequence[str]] = "local",
) -> List[AssetUri]:
"""Lists available versions for a given asset URI (asynchronous)."""
logger.debug(f"ListVersionsAsync: uri='{uri}', store='{store}'")
stores_list = [store] if isinstance(store, str) else store
logger.debug(f"ListVersionsAsync: uri='{uri}', stores='{stores_list}'")
asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri
# Note: list_versions_async currently only supports a single store.
# If listing versions across multiple stores is needed, this needs to be updated.
# For now, list from the first store.
list_store = stores_list[0] if stores_list else "local"
logger.debug(
f"ListVersionsAsync: Looking up store '{store}'. Available stores: {list(self.stores.keys())}"
f"ListVersionsAsync: Looking up store '{list_store}'. Available stores: {list(self.stores.keys())}"
)
selected_store = self.stores.get(store)
selected_store = self.stores.get(list_store)
if not selected_store:
raise ValueError(f"No store registered for name: {store}")
raise ValueError(f"No store registered for name: {list_store}")
return await selected_store.list_versions(asset_uri_obj)
def list_versions(self, uri: Union[AssetUri, str], store: str = "local") -> List[AssetUri]:
def list_versions(
self,
uri: Union[AssetUri, str],
store: Union[str, Sequence[str]] = "local",
) -> List[AssetUri]:
"""Lists available versions for a given asset URI (synchronous wrapper)."""
stores_list = [store] if isinstance(store, str) else store
logger.debug(
f"ListVersions: uri='{uri}', store='{store}' from T:{threading.current_thread().name}"
f"ListVersions: uri='{uri}', stores='{stores_list}' from T:{threading.current_thread().name}"
)
try:
return asyncio.run(self.list_versions_async(uri, store))
return asyncio.run(self.list_versions_async(uri, stores_list))
except Exception as e:
logger.error(
f"ListVersions: Error for uri='{uri}', store='{store}': {e}",
f"ListVersions: Error for uri='{uri}', stores='{stores_list}': {e}",
exc_info=False,
) # Changed exc_info to False
)
return [] # Return empty list on error to satisfy type hint
def get_registered_asset_types(self) -> List[str]: