"""Async upload queue for Silo PLM. Provides a background ``QThread`` that processes file uploads, DAG syncs, and BOM syncs without blocking the main UI thread. All data required for an upload is captured on the main thread before enqueuing — the worker never accesses ``App.Document``, ``Gui.Document``, or any Qt widget. Typical usage from ``SiloOrigin.saveDocument``:: task = UploadTask( doc_name=doc.Name, part_number=obj.SiloPartNumber, file_path=str(saved_path), properties=collected_props, comment="", dag_data=extracted_dag, bom_data=extracted_bom, ) get_upload_queue().enqueue(task) """ import collections import dataclasses import threading import time import FreeCAD from PySide6.QtCore import QThread, Signal # --------------------------------------------------------------------------- # Data structures # --------------------------------------------------------------------------- @dataclasses.dataclass class UploadTask: """Immutable snapshot of everything needed to upload a document revision. All fields are plain Python types — no FreeCAD or Qt objects. """ doc_name: str """FreeCAD document internal name (used for coalescing).""" part_number: str """Silo part number.""" file_path: str """Absolute path to the saved ``.kc`` file on disk.""" properties: dict """Pre-collected document properties dict.""" comment: str = "" """Revision comment (empty for auto-save).""" dag_data: tuple | None = None """Pre-extracted ``(nodes, edges)`` or ``None``.""" bom_data: object | None = None """Pre-extracted BOM payload or ``None``.""" timestamp: float = dataclasses.field(default_factory=time.time) """Time the task was created (``time.time()``).""" # --------------------------------------------------------------------------- # Worker thread # --------------------------------------------------------------------------- class UploadWorker(QThread): """Background thread that processes :class:`UploadTask` items serially. Signals are emitted on completion so the main thread can update the UI. The worker never touches FreeCAD documents or Qt widgets. """ upload_started = Signal(str, str) # doc_name, part_number upload_finished = Signal(str, str, int) # doc_name, part_number, revision upload_failed = Signal(str, str, str) # doc_name, part_number, error queue_empty = Signal() def __init__(self, parent=None): super().__init__(parent) self._queue: collections.deque[UploadTask] = collections.deque() self._lock = threading.Lock() self._event = threading.Event() self._stop = False # -- public API (called from main thread) -------------------------------- def enqueue(self, task: UploadTask) -> None: """Add a task, coalescing any pending task for the same document.""" with self._lock: self._queue = collections.deque(t for t in self._queue if t.doc_name != task.doc_name) self._queue.append(task) self._event.set() def cancel(self, doc_name: str) -> None: """Remove any pending (not yet started) task for *doc_name*.""" with self._lock: self._queue = collections.deque(t for t in self._queue if t.doc_name != doc_name) @property def pending_count(self) -> int: """Number of tasks waiting to be processed.""" with self._lock: return len(self._queue) def shutdown(self) -> None: """Signal the worker to stop and wait up to 5 s for it to finish.""" self._stop = True self._event.set() self.wait(5000) # -- thread entry point -------------------------------------------------- def run(self) -> None: # noqa: D401 — Qt override """Process tasks until :meth:`shutdown` is called.""" from silo_commands import _client, _update_manifest_revision while not self._stop: self._event.wait(timeout=1.0) self._event.clear() while not self._stop: task = self._pop_task() if task is None: break self._process(task, _client) if not self._stop and self.pending_count == 0: self.queue_empty.emit() # -- internals ----------------------------------------------------------- def _pop_task(self) -> UploadTask | None: with self._lock: return self._queue.popleft() if self._queue else None def _process(self, task: UploadTask, client) -> None: self.upload_started.emit(task.doc_name, task.part_number) try: result = client._upload_file( task.part_number, task.file_path, task.properties, task.comment ) rev = result.get("revision_number", 0) # DAG sync (non-critical) if task.dag_data: try: nodes, edges = task.dag_data client.push_dag(task.part_number, rev, nodes, edges) except Exception as exc: FreeCAD.Console.PrintWarning( f"[upload-queue] DAG sync failed for {task.part_number}: {exc}\n" ) # BOM sync (non-critical) if task.bom_data: try: _push_bom_from_extracted(task.part_number, task.bom_data, client) except Exception as exc: FreeCAD.Console.PrintWarning( f"[upload-queue] BOM sync failed for {task.part_number}: {exc}\n" ) # Manifest revision update (local file I/O, safe from worker) try: from silo_commands import _update_manifest_revision _update_manifest_revision(task.file_path, rev) except Exception: pass self.upload_finished.emit(task.doc_name, task.part_number, rev) FreeCAD.Console.PrintMessage( f"[upload-queue] Uploaded {task.part_number} as revision {rev}\n" ) except Exception as exc: self.upload_failed.emit(task.doc_name, task.part_number, str(exc)) FreeCAD.Console.PrintWarning( f"[upload-queue] Upload failed for {task.part_number}: {exc}\n" ) def _push_bom_from_extracted(part_number: str, bom_data, client) -> None: """Diff and apply pre-extracted BOM entries against the server. *bom_data* is a list of resolved ``bom_sync.BomEntry`` dataclasses (with ``part_number`` populated) produced by :func:`silo_commands._extract_bom_data` on the main thread. This function fetches the current server BOM, diffs it against the local entries, and applies adds/quantity updates. No FreeCAD document access is needed — safe to call from the worker thread. """ if not bom_data: return from bom_sync import apply_bom_diff, diff_bom try: remote = client.get_bom(part_number) except Exception: remote = [] diff = diff_bom(bom_data, remote) result = apply_bom_diff(diff, part_number, client) parts = [] if result.added_count: parts.append(f"+{result.added_count} added") if result.updated_count: parts.append(f"~{result.updated_count} qty updated") if parts: FreeCAD.Console.PrintMessage( f"[upload-queue] BOM synced for {part_number}: {', '.join(parts)}\n" ) # --------------------------------------------------------------------------- # Module-level singleton # --------------------------------------------------------------------------- _upload_queue: UploadWorker | None = None def get_upload_queue() -> UploadWorker: """Return the global upload queue, creating it on first call.""" global _upload_queue if _upload_queue is None: _upload_queue = UploadWorker() _upload_queue.start() return _upload_queue def shutdown_upload_queue() -> None: """Stop the global upload queue if running.""" global _upload_queue if _upload_queue is not None: count = _upload_queue.pending_count if count: FreeCAD.Console.PrintWarning( f"[upload-queue] Waiting for {count} pending upload(s)...\n" ) _upload_queue.shutdown() _upload_queue = None