Files
silo-mod/freecad/silo_upload_queue.py
forbes-0023 c5f00219fa feat(silo): async save queue — background upload with coalescing (#392)
Local save is synchronous (data safe on disk before returning).
Network upload (file, DAG, BOM) runs in a background QThread.

New files:
- silo_upload_queue.py: UploadTask dataclass + UploadWorker(QThread)
  with coalescing, cancel, shutdown, and Qt signals
- silo_status_widget.py: SyncStatusWidget for status bar feedback

Modified:
- silo_commands.py: _extract_dag_data, _extract_bom_data, _enqueue_upload
  helpers; Silo_Save and Silo_Commit use async enqueue
- silo_origin.py: saveDocument() saves locally then enqueues async upload
- InitGui.py: upload queue init at 700ms, shutdown handler via atexit

Thread safety: worker never accesses App.Document or Qt widgets;
all data pre-extracted on main thread as plain dicts/strings.
Queue uses threading.Lock; signals cross thread boundary via Qt.
2026-03-05 11:08:44 -06:00

251 lines
8.3 KiB
Python

"""Async upload queue for Silo PLM.
Provides a background ``QThread`` that processes file uploads, DAG syncs,
and BOM syncs without blocking the main UI thread. All data required for
an upload is captured on the main thread before enqueuing — the worker
never accesses ``App.Document``, ``Gui.Document``, or any Qt widget.
Typical usage from ``SiloOrigin.saveDocument``::
task = UploadTask(
doc_name=doc.Name,
part_number=obj.SiloPartNumber,
file_path=str(saved_path),
properties=collected_props,
comment="",
dag_data=extracted_dag,
bom_data=extracted_bom,
)
get_upload_queue().enqueue(task)
"""
import collections
import dataclasses
import threading
import time
import FreeCAD
from PySide6.QtCore import QThread, Signal
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclasses.dataclass
class UploadTask:
"""Immutable snapshot of everything needed to upload a document revision.
All fields are plain Python types — no FreeCAD or Qt objects.
"""
doc_name: str
"""FreeCAD document internal name (used for coalescing)."""
part_number: str
"""Silo part number."""
file_path: str
"""Absolute path to the saved ``.kc`` file on disk."""
properties: dict
"""Pre-collected document properties dict."""
comment: str = ""
"""Revision comment (empty for auto-save)."""
dag_data: tuple | None = None
"""Pre-extracted ``(nodes, edges)`` or ``None``."""
bom_data: object | None = None
"""Pre-extracted BOM payload or ``None``."""
timestamp: float = dataclasses.field(default_factory=time.time)
"""Time the task was created (``time.time()``)."""
# ---------------------------------------------------------------------------
# Worker thread
# ---------------------------------------------------------------------------
class UploadWorker(QThread):
"""Background thread that processes :class:`UploadTask` items serially.
Signals are emitted on completion so the main thread can update the UI.
The worker never touches FreeCAD documents or Qt widgets.
"""
upload_started = Signal(str, str) # doc_name, part_number
upload_finished = Signal(str, str, int) # doc_name, part_number, revision
upload_failed = Signal(str, str, str) # doc_name, part_number, error
queue_empty = Signal()
def __init__(self, parent=None):
super().__init__(parent)
self._queue: collections.deque[UploadTask] = collections.deque()
self._lock = threading.Lock()
self._event = threading.Event()
self._stop = False
# -- public API (called from main thread) --------------------------------
def enqueue(self, task: UploadTask) -> None:
"""Add a task, coalescing any pending task for the same document."""
with self._lock:
self._queue = collections.deque(t for t in self._queue if t.doc_name != task.doc_name)
self._queue.append(task)
self._event.set()
def cancel(self, doc_name: str) -> None:
"""Remove any pending (not yet started) task for *doc_name*."""
with self._lock:
self._queue = collections.deque(t for t in self._queue if t.doc_name != doc_name)
@property
def pending_count(self) -> int:
"""Number of tasks waiting to be processed."""
with self._lock:
return len(self._queue)
def shutdown(self) -> None:
"""Signal the worker to stop and wait up to 5 s for it to finish."""
self._stop = True
self._event.set()
self.wait(5000)
# -- thread entry point --------------------------------------------------
def run(self) -> None: # noqa: D401 — Qt override
"""Process tasks until :meth:`shutdown` is called."""
from silo_commands import _client, _update_manifest_revision
while not self._stop:
self._event.wait(timeout=1.0)
self._event.clear()
while not self._stop:
task = self._pop_task()
if task is None:
break
self._process(task, _client)
if not self._stop and self.pending_count == 0:
self.queue_empty.emit()
# -- internals -----------------------------------------------------------
def _pop_task(self) -> UploadTask | None:
with self._lock:
return self._queue.popleft() if self._queue else None
def _process(self, task: UploadTask, client) -> None:
self.upload_started.emit(task.doc_name, task.part_number)
try:
result = client._upload_file(
task.part_number, task.file_path, task.properties, task.comment
)
rev = result.get("revision_number", 0)
# DAG sync (non-critical)
if task.dag_data:
try:
nodes, edges = task.dag_data
client.push_dag(task.part_number, rev, nodes, edges)
except Exception as exc:
FreeCAD.Console.PrintWarning(
f"[upload-queue] DAG sync failed for {task.part_number}: {exc}\n"
)
# BOM sync (non-critical)
if task.bom_data:
try:
_push_bom_from_extracted(task.part_number, task.bom_data, client)
except Exception as exc:
FreeCAD.Console.PrintWarning(
f"[upload-queue] BOM sync failed for {task.part_number}: {exc}\n"
)
# Manifest revision update (local file I/O, safe from worker)
try:
from silo_commands import _update_manifest_revision
_update_manifest_revision(task.file_path, rev)
except Exception:
pass
self.upload_finished.emit(task.doc_name, task.part_number, rev)
FreeCAD.Console.PrintMessage(
f"[upload-queue] Uploaded {task.part_number} as revision {rev}\n"
)
except Exception as exc:
self.upload_failed.emit(task.doc_name, task.part_number, str(exc))
FreeCAD.Console.PrintWarning(
f"[upload-queue] Upload failed for {task.part_number}: {exc}\n"
)
def _push_bom_from_extracted(part_number: str, bom_data, client) -> None:
"""Diff and apply pre-extracted BOM entries against the server.
*bom_data* is a list of resolved ``bom_sync.BomEntry`` dataclasses
(with ``part_number`` populated) produced by
:func:`silo_commands._extract_bom_data` on the main thread.
This function fetches the current server BOM, diffs it against the
local entries, and applies adds/quantity updates. No FreeCAD
document access is needed — safe to call from the worker thread.
"""
if not bom_data:
return
from bom_sync import apply_bom_diff, diff_bom
try:
remote = client.get_bom(part_number)
except Exception:
remote = []
diff = diff_bom(bom_data, remote)
result = apply_bom_diff(diff, part_number, client)
parts = []
if result.added_count:
parts.append(f"+{result.added_count} added")
if result.updated_count:
parts.append(f"~{result.updated_count} qty updated")
if parts:
FreeCAD.Console.PrintMessage(
f"[upload-queue] BOM synced for {part_number}: {', '.join(parts)}\n"
)
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_upload_queue: UploadWorker | None = None
def get_upload_queue() -> UploadWorker:
"""Return the global upload queue, creating it on first call."""
global _upload_queue
if _upload_queue is None:
_upload_queue = UploadWorker()
_upload_queue.start()
return _upload_queue
def shutdown_upload_queue() -> None:
"""Stop the global upload queue if running."""
global _upload_queue
if _upload_queue is not None:
count = _upload_queue.pending_count
if count:
FreeCAD.Console.PrintWarning(
f"[upload-queue] Waiting for {count} pending upload(s)...\n"
)
_upload_queue.shutdown()
_upload_queue = None