diff --git a/src/Mod/CAM/CAMTests/TestPathToolAssetManager.py b/src/Mod/CAM/CAMTests/TestPathToolAssetManager.py index d014475bc1..ea5fd91ec2 100644 --- a/src/Mod/CAM/CAMTests/TestPathToolAssetManager.py +++ b/src/Mod/CAM/CAMTests/TestPathToolAssetManager.py @@ -309,9 +309,12 @@ class TestPathToolAssetManager(unittest.TestCase): # Test error handling (store not found) non_existent_uri = AssetUri("type://id/1") - with self.assertRaises(ValueError) as cm: + # Test error handling (asset not found in any store, including non-existent ones) + non_existent_uri = AssetUri("type://id/1") + with self.assertRaises(FileNotFoundError) as cm: manager.get_raw(non_existent_uri, store="non_existent_store") - self.assertIn("No store registered for name:", str(cm.exception)) + self.assertIn("Asset 'type://id/1' not found in stores '['non_existent_store']'.", + str(cm.exception)) def test_is_empty(self): # Setup AssetManager with a real MemoryStore @@ -400,9 +403,9 @@ class TestPathToolAssetManager(unittest.TestCase): self.assertIsNone(retrieved_assets[2]) # Test error handling (store not found) - with self.assertRaises(ValueError) as cm: - manager.get_bulk(uris, store="non_existent_store") - self.assertIn("No store registered for name:", str(cm.exception)) + # Test handling of non-existent store (should skip and not raise ValueError) + # The test already asserts that the non-existent asset is None, which is the expected behavior. + manager.get_bulk(uris, store="non_existent_store") def test_fetch(self): # Setup AssetManager with a real MemoryStore and MockAsset class diff --git a/src/Mod/CAM/Path/Tool/assets/manager.py b/src/Mod/CAM/Path/Tool/assets/manager.py index 0f8d88ec5f..537ec2a7c6 100644 --- a/src/Mod/CAM/Path/Tool/assets/manager.py +++ b/src/Mod/CAM/Path/Tool/assets/manager.py @@ -24,7 +24,18 @@ import asyncio import threading import pathlib import hashlib -from typing import Dict, Any, Type, Optional, List, Sequence, Union, Set, Mapping, Tuple +from typing import ( + Dict, + Any, + Type, + Optional, + List, + Sequence, + Union, + Set, + Mapping, + Tuple, +) from dataclasses import dataclass from PySide import QtCore, QtGui from .store.base import AssetStore @@ -41,11 +52,14 @@ logger = logging.getLogger(__name__) class _AssetConstructionData: """Holds raw data and type info needed to construct an asset instance.""" + store: str # Name of the store where this asset was found uri: AssetUri raw_data: bytes asset_class: Type[Asset] # Stores AssetConstructionData for dependencies, keyed by their AssetUri - dependencies_data: Optional[Dict[AssetUri, Optional["_AssetConstructionData"]]] = None + dependencies_data: Optional[ + Dict[AssetUri, Optional["_AssetConstructionData"]] + ] = None class AssetManager: @@ -55,11 +69,15 @@ class AssetManager: self._asset_classes: Dict[str, Type[Asset]] = {} self.asset_cache = AssetCache(max_size_bytes=cache_max_size_bytes) self._cacheable_stores: Set[str] = set() - logger.debug(f"AssetManager initialized (Thread: {threading.current_thread().name})") + logger.debug( + f"AssetManager initialized (Thread: {threading.current_thread().name})" + ) def register_store(self, store: AssetStore, cacheable: bool = False): """Registers an AssetStore with the manager.""" - logger.debug(f"Registering store: {store.name}, cacheable: {cacheable}") + logger.debug( + f"Registering store: {store.name}, cacheable: {cacheable}" + ) self.stores[store.name] = store if cacheable: self._cacheable_stores.add(store.name) @@ -70,32 +88,42 @@ class AssetManager: return serializer raise ValueError(f"No serializer found for class {asset_class}") - def register_asset(self, asset_class: Type[Asset], serializer: Type[AssetSerializer]): + def register_asset( + self, asset_class: Type[Asset], serializer: Type[AssetSerializer] + ): """Registers an Asset class with the manager.""" if not issubclass(asset_class, Asset): - raise TypeError(f"Item '{asset_class.__name__}' must be a subclass of Asset.") + raise TypeError( + f"Item '{asset_class.__name__}' must be a subclass of Asset." + ) if not issubclass(serializer, AssetSerializer): - raise TypeError(f"Item '{serializer.__name__}' must be a subclass of AssetSerializer.") + raise TypeError( + f"Item '{serializer.__name__}' must be a subclass of AssetSerializer." + ) self._serializers.append((serializer, asset_class)) asset_type_name = getattr(asset_class, "asset_type", None) - if not isinstance(asset_type_name, str) or not asset_type_name: # Ensure not empty + if ( + not isinstance(asset_type_name, str) or not asset_type_name + ): # Ensure not empty raise TypeError( f"Asset class '{asset_class.__name__}' must have a non-empty string 'asset_type' attribute." ) - logger.debug(f"Registering asset type: '{asset_type_name}' -> {asset_class.__name__}") + logger.debug( + f"Registering asset type: '{asset_type_name}' -> {asset_class.__name__}" + ) self._asset_classes[asset_type_name] = asset_class async def _fetch_asset_construction_data_recursive_async( self, uri: AssetUri, - store_name: str, + store_names: Sequence[str], visited_uris: Set[AssetUri], depth: Optional[int] = None, ) -> Optional[_AssetConstructionData]: logger.debug( - f"_fetch_asset_construction_data_recursive_async called {store_name} {uri} {depth}" + f"_fetch_asset_construction_data_recursive_async called {store_names} {uri} {depth}" ) if uri in visited_uris: @@ -103,24 +131,45 @@ class AssetManager: raise RuntimeError(f"Cyclic dependency encountered for URI: {uri}") # Check arguments - store = self.stores.get(store_name) - if not store: - raise ValueError(f"No store registered for name: {store_name}") + if not store_names: + raise ValueError("At least one store name must be provided.") + asset_class = self._asset_classes.get(uri.asset_type) if not asset_class: - raise ValueError(f"No asset class registered for asset type: {asset_class}") - - # Fetch the requested asset - try: - raw_data = await store.get(uri) - except FileNotFoundError: - logger.debug( - f"_fetch_asset_construction_data_recursive_async: Asset not found for {uri}" + raise ValueError( + f"No asset class registered for asset type: {asset_class}" ) - return None # Primary asset not found + + # Fetch the requested asset, trying each store in order + raw_data = None + found_store_name = None + for current_store_name in store_names: + store = self.stores.get(current_store_name) + if not store: + logger.warning( + f"Store '{current_store_name}' not registered. Skipping." + ) + continue + + try: + raw_data = await store.get(uri) + found_store_name = current_store_name + logger.debug( + f"_fetch_asset_construction_data_recursive_async: Asset {uri} found in store {found_store_name}" + ) + break # Asset found, no need to check other stores + except FileNotFoundError: + logger.debug( + f"_fetch_asset_construction_data_recursive_async: Asset {uri} not found in store {current_store_name}" + ) + continue # Try next store + + if raw_data is None: + return None # Asset not found in any store if depth == 0: return _AssetConstructionData( + store=found_store_name, uri=uri, raw_data=raw_data, asset_class=asset_class, @@ -129,18 +178,23 @@ class AssetManager: # Extract the list of dependencies (non-recursive) serializer = self.get_serializer_for_class(asset_class) - dependency_uris = asset_class.extract_dependencies(raw_data, serializer) + dependency_uris = asset_class.extract_dependencies( + raw_data, serializer + ) # Initialize deps_construction_data_map. Any dependencies mapped to None # indicate that dependencies were intentionally not fetched. - deps_construction_data: Dict[AssetUri, Optional[_AssetConstructionData]] = {} + deps_construction_data: Dict[ + AssetUri, Optional[_AssetConstructionData] + ] = {} for dep_uri in dependency_uris: visited_uris.add(uri) try: + # For dependencies, use the same list of stores for fallback dep_data = await self._fetch_asset_construction_data_recursive_async( dep_uri, - store_name, + store_names, # Pass the list of stores for dependency fallback visited_uris, None if depth is None else depth - 1, ) @@ -153,6 +207,7 @@ class AssetManager: ) return _AssetConstructionData( + store=found_store_name, uri=uri, raw_data=raw_data, asset_class=asset_class, @@ -171,10 +226,15 @@ class AssetManager: deps_signature_tuple: Tuple = ("shallow_children",) else: deps_signature_tuple = tuple( - sorted(str(uri) for uri in construction_data.dependencies_data.keys()) + sorted( + str(uri) + for uri in construction_data.dependencies_data.keys() + ) ) - raw_data_hash = int(hashlib.sha256(construction_data.raw_data).hexdigest(), 16) + raw_data_hash = int( + hashlib.sha256(construction_data.raw_data).hexdigest(), 16 + ) return CacheKey( store_name=store_name_for_cache, @@ -223,7 +283,9 @@ class AssetManager: # this would need more complex store_name propagation. # For now, use the parent's store_name_for_cache. try: - dep = self._build_asset_tree_from_data_sync(dep_data_node, store_name_for_cache) + dep = self._build_asset_tree_from_data_sync( + dep_data_node, store_name_for_cache + ) except Exception as e: logger.error( f"Error building dependency '{dep_uri}' for asset '{construction_data.uri}': {e}", @@ -253,7 +315,8 @@ class AssetManager: direct_deps_uris_strs: Set[str] = set() if construction_data.dependencies_data is not None: direct_deps_uris_strs = { - str(uri) for uri in construction_data.dependencies_data.keys() + str(uri) + for uri in construction_data.dependencies_data.keys() } raw_data_size = len(construction_data.raw_data) self.asset_cache.put( @@ -267,7 +330,7 @@ class AssetManager: def get( self, uri: Union[AssetUri, str], - store: str = "local", + store: Union[str, Sequence[str]] = "local", depth: Optional[int] = None, ) -> Asset: """ @@ -278,12 +341,14 @@ class AssetManager: """ # Log entry with thread info for verification calling_thread_name = threading.current_thread().name + stores_list = [store] if isinstance(store, str) else store logger.debug( - f"AssetManager.get(uri='{uri}', store='{store}', depth='{depth}') called from thread: {calling_thread_name}" + f"AssetManager.get(uri='{uri}', stores='{stores_list}', depth='{depth}') called from thread: {calling_thread_name}" ) if ( QtGui.QApplication.instance() - and QtCore.QThread.currentThread() is not QtGui.QApplication.instance().thread() + and QtCore.QThread.currentThread() + is not QtGui.QApplication.instance().thread() ): logger.warning( "AssetManager.get() called from a non-main thread! UI in from_bytes may fail!" @@ -298,7 +363,7 @@ class AssetManager: ) all_construction_data = asyncio.run( self._fetch_asset_construction_data_recursive_async( - asset_uri_obj, store, set(), depth + asset_uri_obj, stores_list, set(), depth ) ) logger.debug( @@ -313,7 +378,9 @@ class AssetManager: if all_construction_data is None: # This means the top-level asset itself was not found by _fetch_... - raise FileNotFoundError(f"Asset '{asset_uri_obj}' not found in store '{store}'.") + raise FileNotFoundError( + f"Asset '{asset_uri_obj}' not found in stores '{stores_list}'." + ) # Step 2: Synchronously build the asset tree (and call from_bytes) # This happens in the current thread (which is assumed to be the main UI thread) @@ -331,16 +398,20 @@ class AssetManager: f"Get: Starting synchronous asset tree build for '{asset_uri_obj}' " f"and {deps_count} dependencies ({found_deps_count} resolved)." ) + # Use the first store from the list for caching purposes + store_name_for_cache = stores_list[0] if stores_list else "local" final_asset = self._build_asset_tree_from_data_sync( - all_construction_data, store_name_for_cache=store + all_construction_data, store_name_for_cache=store_name_for_cache + ) + logger.debug( + f"Get: Synchronous asset tree build for '{asset_uri_obj}' completed." ) - logger.debug(f"Get: Synchronous asset tree build for '{asset_uri_obj}' completed.") return final_asset def get_or_none( self, uri: Union[AssetUri, str], - store: str = "local", + store: Union[str, Sequence[str]] = "local", depth: Optional[int] = None, ) -> Asset | None: """ @@ -355,7 +426,7 @@ class AssetManager: async def get_async( self, uri: Union[AssetUri, str], - store: str = "local", + store: Union[str, Sequence[str]] = "local", depth: Optional[int] = None, ) -> Optional[Asset]: """ @@ -365,21 +436,24 @@ class AssetManager: If awaited from a plain worker thread's asyncio loop, from_bytes will run on that worker. """ calling_thread_name = threading.current_thread().name + stores_list = [store] if isinstance(store, str) else store logger.debug( - f"AssetManager.get_async(uri='{uri}', store='{store}', depth='{depth}') called from thread: {calling_thread_name}" + f"AssetManager.get_async(uri='{uri}', stores='{stores_list}', depth='{depth}') called from thread: {calling_thread_name}" ) asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri - all_construction_data = await self._fetch_asset_construction_data_recursive_async( - asset_uri_obj, store, set(), depth + all_construction_data = ( + await self._fetch_asset_construction_data_recursive_async( + asset_uri_obj, stores_list, set(), depth + ) ) if all_construction_data is None: # Consistent with get(), if the top-level asset is not found, # raise FileNotFoundError. raise FileNotFoundError( - f"Asset '{asset_uri_obj}' not found in store '{store}' (async path)." + f"Asset '{asset_uri_obj}' not found in stores '{stores_list}' (async path)." ) # return None # Alternative: if Optional[Asset] means asset might not exist @@ -391,25 +465,46 @@ class AssetManager: all_construction_data, store_name_for_cache=store ) - def get_raw(self, uri: Union[AssetUri, str], store: str = "local") -> bytes: + def get_raw( + self, + uri: Union[AssetUri, str], + store: Union[str, Sequence[str]] = "local", + ) -> bytes: """Retrieves raw asset data by its URI (synchronous wrapper).""" + stores_list = [store] if isinstance(store, str) else store logger.debug( - f"AssetManager.get_raw(uri='{uri}', store='{store}') from T:{threading.current_thread().name}" + f"AssetManager.get_raw(uri='{uri}', stores='{stores_list}') from T:{threading.current_thread().name}" ) - async def _fetch_raw_async(): + async def _fetch_raw_async(stores_list: Sequence[str]): asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri logger.debug( - f"GetRawAsync (internal): Looking up store '{store}'. Available stores: {list(self.stores.keys())}" + f"GetRawAsync (internal): Trying stores '{stores_list}'. Available stores: {list(self.stores.keys())}" + ) + for current_store_name in stores_list: + store = self.stores.get(current_store_name) + if not store: + logger.warning( + f"Store '{current_store_name}' not registered. Skipping." + ) + continue + try: + raw_data = await store.get(asset_uri_obj) + logger.debug( + f"GetRawAsync: Asset {asset_uri_obj} found in store {current_store_name}" + ) + return raw_data + except FileNotFoundError: + logger.debug( + f"GetRawAsync: Asset {asset_uri_obj} not found in store {current_store_name}" + ) + continue + raise FileNotFoundError( + f"Asset '{asset_uri_obj}' not found in stores '{stores_list}'." ) - try: - selected_store = self.stores[store] - except KeyError: - raise ValueError(f"No store registered for name: {store}") - return await selected_store.get(asset_uri_obj) try: - return asyncio.run(_fetch_raw_async()) + return asyncio.run(_fetch_raw_async(stores_list)) except Exception as e: logger.error( f"GetRaw: Error during asyncio.run for '{uri}': {e}", @@ -417,31 +512,58 @@ class AssetManager: ) raise - async def get_raw_async(self, uri: Union[AssetUri, str], store: str = "local") -> bytes: + async def get_raw_async( + self, + uri: Union[AssetUri, str], + store: Union[str, Sequence[str]] = "local", + ) -> bytes: """Retrieves raw asset data by its URI (asynchronous).""" + stores_list = [store] if isinstance(store, str) else store logger.debug( - f"AssetManager.get_raw_async(uri='{uri}', store='{store}') from T:{threading.current_thread().name}" + f"AssetManager.get_raw_async(uri='{uri}', stores='{stores_list}') from T:{threading.current_thread().name}" ) asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri - selected_store = self.stores[store] - return await selected_store.get(asset_uri_obj) + + for current_store_name in stores_list: + store = self.stores.get(current_store_name) + if not store: + logger.warning( + f"Store '{current_store_name}' not registered. Skipping." + ) + continue + try: + raw_data = await store.get(asset_uri_obj) + logger.debug( + f"GetRawAsync: Asset {asset_uri_obj} found in store {current_store_name}" + ) + return raw_data + except FileNotFoundError: + logger.debug( + f"GetRawAsync: Asset {asset_uri_obj} not found in store {current_store_name}" + ) + continue + + raise FileNotFoundError( + f"Asset '{asset_uri_obj}' not found in stores '{stores_list}'." + ) def get_bulk( self, uris: Sequence[Union[AssetUri, str]], - store: str = "local", + store: Union[str, Sequence[str]] = "local", depth: Optional[int] = None, ) -> List[Any]: """Retrieves multiple assets by their URIs (synchronous wrapper), to a specified depth.""" + stores_list = [store] if isinstance(store, str) else store logger.debug( - f"AssetManager.get_bulk for {len(uris)} URIs from store '{store}', depth '{depth}'" + f"AssetManager.get_bulk for {len(uris)} URIs from stores '{stores_list}', depth '{depth}'" ) async def _fetch_all_construction_data_bulk_async(): tasks = [ self._fetch_asset_construction_data_recursive_async( AssetUri(u) if isinstance(u, str) else u, - store, + stores_list, set(), depth, ) @@ -453,9 +575,13 @@ class AssetManager: try: logger.debug("GetBulk: Starting bulk data fetching") - all_construction_data_list = asyncio.run(_fetch_all_construction_data_bulk_async()) + all_construction_data_list = asyncio.run( + _fetch_all_construction_data_bulk_async() + ) logger.debug("GetBulk: bulk data fetching completed") - except Exception as e: # Should ideally not happen if gather returns exceptions + except ( + Exception + ) as e: # Should ideally not happen if gather returns exceptions logger.error( f"GetBulk: Unexpected error during asyncio.run for bulk data: {e}", exc_info=False, @@ -474,11 +600,21 @@ class AssetManager: raise data_or_exc elif isinstance(data_or_exc, _AssetConstructionData): # Build asset instance synchronously. Exceptions during build should propagate. - assets.append( - self._build_asset_tree_from_data_sync(data_or_exc, store_name_for_cache=store) + # Use the first store from the list for caching purposes in build_asset_tree + store_name_for_cache = ( + stores_list[0] if stores_list else "local" + ) + assets.append( + self._build_asset_tree_from_data_sync( + data_or_exc, store_name_for_cache=store_name_for_cache + ) + ) + elif ( + data_or_exc is None + ): # From _fetch_... returning None for not found + logger.debug( + f"GetBulk: Asset '{original_uri_input}' not found" ) - elif data_or_exc is None: # From _fetch_... returning None for not found - logger.debug(f"GetBulk: Asset '{original_uri_input}' not found") assets.append(None) else: # Should not happen logger.error( @@ -493,28 +629,43 @@ class AssetManager: async def get_bulk_async( self, uris: Sequence[Union[AssetUri, str]], - store: str = "local", + store: Union[str, Sequence[str]] = "local", depth: Optional[int] = None, ) -> List[Any]: """Retrieves multiple assets by their URIs (asynchronous), to a specified depth.""" + stores_list = [store] if isinstance(store, str) else store logger.debug( - f"AssetManager.get_bulk_async for {len(uris)} URIs from store '{store}', depth '{depth}'" + f"AssetManager.get_bulk_async for {len(uris)} URIs from stores '{stores_list}', depth '{depth}'" ) tasks = [ self._fetch_asset_construction_data_recursive_async( - AssetUri(u) if isinstance(u, str) else u, store, set(), depth + AssetUri(u) if isinstance(u, str) else u, + stores_list, + set(), + depth, ) for u in uris ] - all_construction_data_list = await asyncio.gather(*tasks, return_exceptions=True) + all_construction_data_list = await asyncio.gather( + *tasks, return_exceptions=True + ) assets = [] for i, data_or_exc in enumerate(all_construction_data_list): if isinstance(data_or_exc, _AssetConstructionData): - assets.append( - self._build_asset_tree_from_data_sync(data_or_exc, store_name_for_cache=store) + # Use the first store from the list for caching purposes in build_asset_tree + store_name_for_cache = ( + stores_list[0] if stores_list else "local" ) - elif isinstance(data_or_exc, FileNotFoundError) or data_or_exc is None: + assets.append( + self._build_asset_tree_from_data_sync( + data_or_exc, store_name_for_cache=store_name_for_cache + ) + ) + elif ( + isinstance(data_or_exc, FileNotFoundError) + or data_or_exc is None + ): assets.append(None) elif isinstance(data_or_exc, Exception): assets.append(data_or_exc) # Caller must check @@ -523,25 +674,47 @@ class AssetManager: def exists( self, uri: Union[AssetUri, str], - store: str = "local", + store: Union[str, Sequence[str]] = "local", ) -> bool: """ - Returns True if the asset exists, False otherwise. + Returns True if the asset exists in any of the specified stores, False otherwise. """ - async def _exists_async(): + async def _exists_async(stores_list: Sequence[str]): asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri logger.debug( - f"ExistsAsync (internal): Looking up store '{store}'. Available stores: {list(self.stores.keys())}" + f"ExistsAsync (internal): Trying stores '{stores_list}'. Available stores: {list(self.stores.keys())}" ) - try: - selected_store = self.stores[store] - except KeyError: - raise ValueError(f"No store registered for name: {store}") - return await selected_store.exists(asset_uri_obj) + for current_store_name in stores_list: + store = self.stores.get(current_store_name) + if not store: + logger.warning( + f"Store '{current_store_name}' not registered. Skipping." + ) + continue + try: + exists = await store.exists(asset_uri_obj) + if exists: + logger.debug( + f"ExistsAsync: Asset {asset_uri_obj} found in store {current_store_name}" + ) + return True + else: + logger.debug( + f"ExistsAsync: Asset {asset_uri_obj} not found in store {current_store_name}" + ) + continue + except Exception as e: + logger.error( + f"ExistsAsync: Error checking store '{current_store_name}': {e}", + exc_info=False, + ) + continue # Try next store + return False # Not found in any store + stores_list = [store] if isinstance(store, str) else store try: - return asyncio.run(_exists_async()) + return asyncio.run(_exists_async(stores_list)) except Exception as e: logger.error( f"AssetManager.exists: Error during asyncio.run for '{uri}': {e}", @@ -554,15 +727,22 @@ class AssetManager: asset_type: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, - store: str = "local", + store: Union[str, Sequence[str]] = "local", depth: Optional[int] = None, ) -> List[Asset]: """Fetches asset instances based on type, limit, and offset (synchronous), to a specified depth.""" - logger.debug(f"Fetch(type='{asset_type}', store='{store}', depth='{depth}')") - asset_uris = self.list_assets( - asset_type, limit, offset, store - ) # list_assets doesn't need depth - results = self.get_bulk(asset_uris, store, depth) # Pass depth to get_bulk + stores_list = [store] if isinstance(store, str) else store + logger.debug( + f"Fetch(type='{asset_type}', stores='{stores_list}', depth='{depth}')" + ) + # Note: list_assets currently only supports a single store. + # If fetching from multiple stores is needed for listing, this needs + # to be updated. For now, list from the first store. + list_store = stores_list[0] if stores_list else "local" + asset_uris = self.list_assets(asset_type, limit, offset, list_store) + results = self.get_bulk( + asset_uris, stores_list, depth + ) # Pass stores_list to get_bulk # Filter out non-Asset objects (e.g., None for not found, or exceptions if collected) return [asset for asset in results if isinstance(asset, Asset)] @@ -571,17 +751,24 @@ class AssetManager: asset_type: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, - store: str = "local", + store: Union[str, Sequence[str]] = "local", depth: Optional[int] = None, ) -> List[Asset]: """Fetches asset instances based on type, limit, and offset (asynchronous), to a specified depth.""" - logger.debug(f"FetchAsync(type='{asset_type}', store='{store}', depth='{depth}')") + stores_list = [store] if isinstance(store, str) else store + logger.debug( + f"FetchAsync(type='{asset_type}', stores='{stores_list}', depth='{depth}')" + ) + # Note: list_assets_async currently only supports a single store. + # If fetching from multiple stores is needed for listing, this needs + # to be updated. For now, list from the first store. + list_store = stores_list[0] if stores_list else "local" asset_uris = await self.list_assets_async( - asset_type, limit, offset, store # list_assets_async doesn't need depth + asset_type, limit, offset, list_store ) results = await self.get_bulk_async( - asset_uris, store, depth - ) # Pass depth to get_bulk_async + asset_uris, stores_list, depth + ) # Pass stores_list to get_bulk_async return [asset for asset in results if isinstance(asset, Asset)] def list_assets( @@ -589,49 +776,79 @@ class AssetManager: asset_type: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, - store: str = "local", + store: Union[str, Sequence[str]] = "local", ) -> List[AssetUri]: - logger.debug(f"ListAssets(type='{asset_type}', store='{store}')") - return asyncio.run(self.list_assets_async(asset_type, limit, offset, store)) + stores_list = [store] if isinstance(store, str) else store + logger.debug( + f"ListAssets(type='{asset_type}', stores='{stores_list}')" + ) + # Note: list_assets_async currently only supports a single store. + # If listing from multiple stores is needed, this needs to be updated. + # For now, list from the first store. + list_store = stores_list[0] if stores_list else "local" + return asyncio.run( + self.list_assets_async(asset_type, limit, offset, list_store) + ) async def list_assets_async( self, asset_type: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, - store: str = "local", + store: Union[str, Sequence[str]] = "local", ) -> List[AssetUri]: - logger.debug(f"ListAssetsAsync executing for type='{asset_type}', store='{store}'") + stores_list = [store] if isinstance(store, str) else store logger.debug( - f"ListAssetsAsync: Looking up store '{store}'. Available stores: {list(self.stores.keys())}" + f"ListAssetsAsync executing for type='{asset_type}', stores='{stores_list}'" + ) + # Note: list_assets_async currently only supports a single store. + # If listing from multiple stores is needed, this needs to be updated. + # For now, list from the first store. + list_store = stores_list[0] if stores_list else "local" + logger.debug( + f"ListAssetsAsync: Looking up store '{list_store}'. Available stores: {list(self.stores.keys())}" ) try: - selected_store = self.stores[store] + selected_store = self.stores[list_store] except KeyError: - raise ValueError(f"No store registered for name: {store}") + raise ValueError(f"No store registered for name: {list_store}") return await selected_store.list_assets(asset_type, limit, offset) def count_assets( self, asset_type: Optional[str] = None, - store: str = "local", + store: Union[str, Sequence[str]] = "local", ) -> int: - logger.debug(f"CountAssets(type='{asset_type}', store='{store}')") - return asyncio.run(self.count_assets_async(asset_type, store)) + stores_list = [store] if isinstance(store, str) else store + logger.debug( + f"CountAssets(type='{asset_type}', stores='{stores_list}')" + ) + # Note: count_assets_async currently only supports a single store. + # If counting across multiple stores is needed, this needs to be updated. + # For now, count from the first store. + count_store = stores_list[0] if stores_list else "local" + return asyncio.run(self.count_assets_async(asset_type, count_store)) async def count_assets_async( self, asset_type: Optional[str] = None, - store: str = "local", + store: Union[str, Sequence[str]] = "local", ) -> int: - logger.debug(f"CountAssetsAsync executing for type='{asset_type}', store='{store}'") + stores_list = [store] if isinstance(store, str) else store logger.debug( - f"CountAssetsAsync: Looking up store '{store}'. Available stores: {list(self.stores.keys())}" + f"CountAssetsAsync executing for type='{asset_type}', stores='{stores_list}'" + ) + # Note: count_assets_async currently only supports a single store. + # If counting across multiple stores is needed, this needs to be updated. + # For now, count from the first store. + count_store = stores_list[0] if stores_list else "local" + logger.debug( + f"CountAssetsAsync: Looking up store '{count_store}'. Available stores: {list(self.stores.keys())}" ) try: - selected_store = self.stores[store] + selected_store = self.stores[count_store] except KeyError: - raise ValueError(f"No store registered for name: {store}") + raise ValueError(f"No store registered for name: {count_store}") return await selected_store.count_assets(asset_type) def _is_registered_type(self, obj: Asset) -> bool: @@ -646,14 +863,20 @@ class AssetManager: Adds an asset to the store, either creating a new one or updating an existing one. Uses obj.get_url() to determine if the asset exists. """ - logger.debug(f"AddAsync: Adding {type(obj).__name__} to store '{store}'") + logger.debug( + f"AddAsync: Adding {type(obj).__name__} to store '{store}'" + ) uri = obj.get_uri() if not self._is_registered_type(obj): - logger.warning(f"Asset has unregistered type '{uri.asset_type}' ({type(obj).__name__})") + logger.warning( + f"Asset has unregistered type '{uri.asset_type}' ({type(obj).__name__})" + ) serializer = self.get_serializer_for_class(obj.__class__) data = obj.to_bytes(serializer) - return await self.add_raw_async(uri.asset_type, uri.asset_id, data, store) + return await self.add_raw_async( + uri.asset_type, uri.asset_id, data, store + ) def add(self, obj: Asset, store: str = "local") -> AssetUri: """Synchronous wrapper for adding an asset to the store.""" @@ -668,9 +891,13 @@ class AssetManager: """ Adds raw asset data to the store, either creating a new asset or updating an existing one. """ - logger.debug(f"AddRawAsync: type='{asset_type}', id='{asset_id}', store='{store}'") + logger.debug( + f"AddRawAsync: type='{asset_type}', id='{asset_id}', store='{store}'" + ) if not asset_type or not asset_id: - raise ValueError("asset_type and asset_id must be provided for add_raw.") + raise ValueError( + "asset_type and asset_id must be provided for add_raw." + ) if not isinstance(data, bytes): raise TypeError("Data for add_raw must be bytes.") selected_store = self.stores.get(store) @@ -687,7 +914,9 @@ class AssetManager: uri = await selected_store.create(asset_type, asset_id, data) if store in self._cacheable_stores: - self.asset_cache.invalidate_for_uri(str(uri)) # Invalidate after add/update + self.asset_cache.invalidate_for_uri( + str(uri) + ) # Invalidate after add/update return uri def add_raw( @@ -698,10 +927,13 @@ class AssetManager: f"AddRaw: type='{asset_type}', id='{asset_id}', store='{store}' from T:{threading.current_thread().name}" ) try: - return asyncio.run(self.add_raw_async(asset_type, asset_id, data, store)) + return asyncio.run( + self.add_raw_async(asset_type, asset_id, data, store) + ) except Exception as e: logger.error( - f"AddRaw: Error for type='{asset_type}', id='{asset_id}': {e}", exc_info=False + f"AddRaw: Error for type='{asset_type}', id='{asset_id}': {e}", + exc_info=False, ) raise @@ -716,7 +948,9 @@ class AssetManager: Convenience wrapper around add_raw(). If asset_id is None, the path.stem is used as the id. """ - return self.add_raw(asset_type, asset_id or path.stem, path.read_bytes(), store=store) + return self.add_raw( + asset_type, asset_id or path.stem, path.read_bytes(), store=store + ) def delete(self, uri: Union[AssetUri, str], store: str = "local") -> None: logger.debug(f"Delete URI '{uri}' from store '{store}'") @@ -730,7 +964,9 @@ class AssetManager: asyncio.run(_do_delete_async()) - async def delete_async(self, uri: Union[AssetUri, str], store: str = "local") -> None: + async def delete_async( + self, uri: Union[AssetUri, str], store: str = "local" + ) -> None: logger.debug(f"DeleteAsync URI '{uri}' from store '{store}'") asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri selected_store = self.stores[store] @@ -738,7 +974,9 @@ class AssetManager: if store in self._cacheable_stores: self.asset_cache.invalidate_for_uri(str(asset_uri_obj)) - async def is_empty_async(self, asset_type: Optional[str] = None, store: str = "local") -> bool: + async def is_empty_async( + self, asset_type: Optional[str] = None, store: str = "local" + ) -> bool: """Checks if the asset store has any assets of a given type (asynchronous).""" logger.debug(f"IsEmptyAsync: type='{asset_type}', store='{store}'") logger.debug( @@ -749,7 +987,9 @@ class AssetManager: raise ValueError(f"No store registered for name: {store}") return await selected_store.is_empty(asset_type) - def is_empty(self, asset_type: Optional[str] = None, store: str = "local") -> bool: + def is_empty( + self, asset_type: Optional[str] = None, store: str = "local" + ) -> bool: """Checks if the asset store has any assets of a given type (synchronous wrapper).""" logger.debug( f"IsEmpty: type='{asset_type}', store='{store}' from T:{threading.current_thread().name}" @@ -764,32 +1004,44 @@ class AssetManager: raise async def list_versions_async( - self, uri: Union[AssetUri, str], store: str = "local" + self, + uri: Union[AssetUri, str], + store: Union[str, Sequence[str]] = "local", ) -> List[AssetUri]: """Lists available versions for a given asset URI (asynchronous).""" - logger.debug(f"ListVersionsAsync: uri='{uri}', store='{store}'") + stores_list = [store] if isinstance(store, str) else store + logger.debug(f"ListVersionsAsync: uri='{uri}', stores='{stores_list}'") asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri + # Note: list_versions_async currently only supports a single store. + # If listing versions across multiple stores is needed, this needs to be updated. + # For now, list from the first store. + list_store = stores_list[0] if stores_list else "local" logger.debug( - f"ListVersionsAsync: Looking up store '{store}'. Available stores: {list(self.stores.keys())}" + f"ListVersionsAsync: Looking up store '{list_store}'. Available stores: {list(self.stores.keys())}" ) - selected_store = self.stores.get(store) + selected_store = self.stores.get(list_store) if not selected_store: - raise ValueError(f"No store registered for name: {store}") + raise ValueError(f"No store registered for name: {list_store}") return await selected_store.list_versions(asset_uri_obj) - def list_versions(self, uri: Union[AssetUri, str], store: str = "local") -> List[AssetUri]: + def list_versions( + self, + uri: Union[AssetUri, str], + store: Union[str, Sequence[str]] = "local", + ) -> List[AssetUri]: """Lists available versions for a given asset URI (synchronous wrapper).""" + stores_list = [store] if isinstance(store, str) else store logger.debug( - f"ListVersions: uri='{uri}', store='{store}' from T:{threading.current_thread().name}" + f"ListVersions: uri='{uri}', stores='{stores_list}' from T:{threading.current_thread().name}" ) try: - return asyncio.run(self.list_versions_async(uri, store)) + return asyncio.run(self.list_versions_async(uri, stores_list)) except Exception as e: logger.error( - f"ListVersions: Error for uri='{uri}', store='{store}': {e}", + f"ListVersions: Error for uri='{uri}', stores='{stores_list}': {e}", exc_info=False, - ) # Changed exc_info to False + ) return [] # Return empty list on error to satisfy type hint def get_registered_asset_types(self) -> List[str]: