Local save is synchronous (data safe on disk before returning). Network upload (file, DAG, BOM) runs in a background QThread. New files: - silo_upload_queue.py: UploadTask dataclass + UploadWorker(QThread) with coalescing, cancel, shutdown, and Qt signals - silo_status_widget.py: SyncStatusWidget for status bar feedback Modified: - silo_commands.py: _extract_dag_data, _extract_bom_data, _enqueue_upload helpers; Silo_Save and Silo_Commit use async enqueue - silo_origin.py: saveDocument() saves locally then enqueues async upload - InitGui.py: upload queue init at 700ms, shutdown handler via atexit Thread safety: worker never accesses App.Document or Qt widgets; all data pre-extracted on main thread as plain dicts/strings. Queue uses threading.Lock; signals cross thread boundary via Qt.
251 lines
8.3 KiB
Python
251 lines
8.3 KiB
Python
"""Async upload queue for Silo PLM.
|
|
|
|
Provides a background ``QThread`` that processes file uploads, DAG syncs,
|
|
and BOM syncs without blocking the main UI thread. All data required for
|
|
an upload is captured on the main thread before enqueuing — the worker
|
|
never accesses ``App.Document``, ``Gui.Document``, or any Qt widget.
|
|
|
|
Typical usage from ``SiloOrigin.saveDocument``::
|
|
|
|
task = UploadTask(
|
|
doc_name=doc.Name,
|
|
part_number=obj.SiloPartNumber,
|
|
file_path=str(saved_path),
|
|
properties=collected_props,
|
|
comment="",
|
|
dag_data=extracted_dag,
|
|
bom_data=extracted_bom,
|
|
)
|
|
get_upload_queue().enqueue(task)
|
|
"""
|
|
|
|
import collections
|
|
import dataclasses
|
|
import threading
|
|
import time
|
|
|
|
import FreeCAD
|
|
from PySide6.QtCore import QThread, Signal
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data structures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class UploadTask:
|
|
"""Immutable snapshot of everything needed to upload a document revision.
|
|
|
|
All fields are plain Python types — no FreeCAD or Qt objects.
|
|
"""
|
|
|
|
doc_name: str
|
|
"""FreeCAD document internal name (used for coalescing)."""
|
|
|
|
part_number: str
|
|
"""Silo part number."""
|
|
|
|
file_path: str
|
|
"""Absolute path to the saved ``.kc`` file on disk."""
|
|
|
|
properties: dict
|
|
"""Pre-collected document properties dict."""
|
|
|
|
comment: str = ""
|
|
"""Revision comment (empty for auto-save)."""
|
|
|
|
dag_data: tuple | None = None
|
|
"""Pre-extracted ``(nodes, edges)`` or ``None``."""
|
|
|
|
bom_data: object | None = None
|
|
"""Pre-extracted BOM payload or ``None``."""
|
|
|
|
timestamp: float = dataclasses.field(default_factory=time.time)
|
|
"""Time the task was created (``time.time()``)."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Worker thread
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class UploadWorker(QThread):
|
|
"""Background thread that processes :class:`UploadTask` items serially.
|
|
|
|
Signals are emitted on completion so the main thread can update the UI.
|
|
The worker never touches FreeCAD documents or Qt widgets.
|
|
"""
|
|
|
|
upload_started = Signal(str, str) # doc_name, part_number
|
|
upload_finished = Signal(str, str, int) # doc_name, part_number, revision
|
|
upload_failed = Signal(str, str, str) # doc_name, part_number, error
|
|
queue_empty = Signal()
|
|
|
|
def __init__(self, parent=None):
|
|
super().__init__(parent)
|
|
self._queue: collections.deque[UploadTask] = collections.deque()
|
|
self._lock = threading.Lock()
|
|
self._event = threading.Event()
|
|
self._stop = False
|
|
|
|
# -- public API (called from main thread) --------------------------------
|
|
|
|
def enqueue(self, task: UploadTask) -> None:
|
|
"""Add a task, coalescing any pending task for the same document."""
|
|
with self._lock:
|
|
self._queue = collections.deque(t for t in self._queue if t.doc_name != task.doc_name)
|
|
self._queue.append(task)
|
|
self._event.set()
|
|
|
|
def cancel(self, doc_name: str) -> None:
|
|
"""Remove any pending (not yet started) task for *doc_name*."""
|
|
with self._lock:
|
|
self._queue = collections.deque(t for t in self._queue if t.doc_name != doc_name)
|
|
|
|
@property
|
|
def pending_count(self) -> int:
|
|
"""Number of tasks waiting to be processed."""
|
|
with self._lock:
|
|
return len(self._queue)
|
|
|
|
def shutdown(self) -> None:
|
|
"""Signal the worker to stop and wait up to 5 s for it to finish."""
|
|
self._stop = True
|
|
self._event.set()
|
|
self.wait(5000)
|
|
|
|
# -- thread entry point --------------------------------------------------
|
|
|
|
def run(self) -> None: # noqa: D401 — Qt override
|
|
"""Process tasks until :meth:`shutdown` is called."""
|
|
from silo_commands import _client, _update_manifest_revision
|
|
|
|
while not self._stop:
|
|
self._event.wait(timeout=1.0)
|
|
self._event.clear()
|
|
|
|
while not self._stop:
|
|
task = self._pop_task()
|
|
if task is None:
|
|
break
|
|
self._process(task, _client)
|
|
|
|
if not self._stop and self.pending_count == 0:
|
|
self.queue_empty.emit()
|
|
|
|
# -- internals -----------------------------------------------------------
|
|
|
|
def _pop_task(self) -> UploadTask | None:
|
|
with self._lock:
|
|
return self._queue.popleft() if self._queue else None
|
|
|
|
def _process(self, task: UploadTask, client) -> None:
|
|
self.upload_started.emit(task.doc_name, task.part_number)
|
|
try:
|
|
result = client._upload_file(
|
|
task.part_number, task.file_path, task.properties, task.comment
|
|
)
|
|
rev = result.get("revision_number", 0)
|
|
|
|
# DAG sync (non-critical)
|
|
if task.dag_data:
|
|
try:
|
|
nodes, edges = task.dag_data
|
|
client.push_dag(task.part_number, rev, nodes, edges)
|
|
except Exception as exc:
|
|
FreeCAD.Console.PrintWarning(
|
|
f"[upload-queue] DAG sync failed for {task.part_number}: {exc}\n"
|
|
)
|
|
|
|
# BOM sync (non-critical)
|
|
if task.bom_data:
|
|
try:
|
|
_push_bom_from_extracted(task.part_number, task.bom_data, client)
|
|
except Exception as exc:
|
|
FreeCAD.Console.PrintWarning(
|
|
f"[upload-queue] BOM sync failed for {task.part_number}: {exc}\n"
|
|
)
|
|
|
|
# Manifest revision update (local file I/O, safe from worker)
|
|
try:
|
|
from silo_commands import _update_manifest_revision
|
|
|
|
_update_manifest_revision(task.file_path, rev)
|
|
except Exception:
|
|
pass
|
|
|
|
self.upload_finished.emit(task.doc_name, task.part_number, rev)
|
|
FreeCAD.Console.PrintMessage(
|
|
f"[upload-queue] Uploaded {task.part_number} as revision {rev}\n"
|
|
)
|
|
|
|
except Exception as exc:
|
|
self.upload_failed.emit(task.doc_name, task.part_number, str(exc))
|
|
FreeCAD.Console.PrintWarning(
|
|
f"[upload-queue] Upload failed for {task.part_number}: {exc}\n"
|
|
)
|
|
|
|
|
|
def _push_bom_from_extracted(part_number: str, bom_data, client) -> None:
|
|
"""Diff and apply pre-extracted BOM entries against the server.
|
|
|
|
*bom_data* is a list of resolved ``bom_sync.BomEntry`` dataclasses
|
|
(with ``part_number`` populated) produced by
|
|
:func:`silo_commands._extract_bom_data` on the main thread.
|
|
|
|
This function fetches the current server BOM, diffs it against the
|
|
local entries, and applies adds/quantity updates. No FreeCAD
|
|
document access is needed — safe to call from the worker thread.
|
|
"""
|
|
if not bom_data:
|
|
return
|
|
from bom_sync import apply_bom_diff, diff_bom
|
|
|
|
try:
|
|
remote = client.get_bom(part_number)
|
|
except Exception:
|
|
remote = []
|
|
|
|
diff = diff_bom(bom_data, remote)
|
|
result = apply_bom_diff(diff, part_number, client)
|
|
|
|
parts = []
|
|
if result.added_count:
|
|
parts.append(f"+{result.added_count} added")
|
|
if result.updated_count:
|
|
parts.append(f"~{result.updated_count} qty updated")
|
|
if parts:
|
|
FreeCAD.Console.PrintMessage(
|
|
f"[upload-queue] BOM synced for {part_number}: {', '.join(parts)}\n"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Module-level singleton
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_upload_queue: UploadWorker | None = None
|
|
|
|
|
|
def get_upload_queue() -> UploadWorker:
|
|
"""Return the global upload queue, creating it on first call."""
|
|
global _upload_queue
|
|
if _upload_queue is None:
|
|
_upload_queue = UploadWorker()
|
|
_upload_queue.start()
|
|
return _upload_queue
|
|
|
|
|
|
def shutdown_upload_queue() -> None:
|
|
"""Stop the global upload queue if running."""
|
|
global _upload_queue
|
|
if _upload_queue is not None:
|
|
count = _upload_queue.pending_count
|
|
if count:
|
|
FreeCAD.Console.PrintWarning(
|
|
f"[upload-queue] Waiting for {count} pending upload(s)...\n"
|
|
)
|
|
_upload_queue.shutdown()
|
|
_upload_queue = None
|