Compare commits

...

1 Commits

Author SHA1 Message Date
c5f00219fa feat(silo): async save queue — background upload with coalescing (#392)
Local save is synchronous (data safe on disk before returning).
Network upload (file, DAG, BOM) runs in a background QThread.

New files:
- silo_upload_queue.py: UploadTask dataclass + UploadWorker(QThread)
  with coalescing, cancel, shutdown, and Qt signals
- silo_status_widget.py: SyncStatusWidget for status bar feedback

Modified:
- silo_commands.py: _extract_dag_data, _extract_bom_data, _enqueue_upload
  helpers; Silo_Save and Silo_Commit use async enqueue
- silo_origin.py: saveDocument() saves locally then enqueues async upload
- InitGui.py: upload queue init at 700ms, shutdown handler via atexit

Thread safety: worker never accesses App.Document or Qt widgets;
all data pre-extracted on main thread as plain dicts/strings.
Queue uses threading.Lock; signals cross thread boundary via Qt.
2026-03-05 11:08:44 -06:00
5 changed files with 450 additions and 39 deletions

View File

@@ -251,6 +251,53 @@ def _handle_startup_urls():
handle_kindred_url(arg) handle_kindred_url(arg)
# ---------------------------------------------------------------------------
# Upload queue — background thread for async file uploads
# ---------------------------------------------------------------------------
def _setup_upload_queue():
"""Start the upload queue and register the status bar widget."""
try:
from silo_upload_queue import get_upload_queue
queue = get_upload_queue()
# Status bar widget
try:
from silo_status_widget import SyncStatusWidget
mw = FreeCADGui.getMainWindow()
if mw and mw.statusBar():
widget = SyncStatusWidget(queue, parent=mw.statusBar())
mw.statusBar().addPermanentWidget(widget)
except Exception as e:
FreeCAD.Console.PrintLog(f"Silo: status widget skipped: {e}\n")
except Exception as e:
FreeCAD.Console.PrintLog(f"Silo: upload queue skipped: {e}\n")
def _shutdown_upload_queue():
"""Drain the upload queue on application exit."""
try:
from silo_upload_queue import shutdown_upload_queue
shutdown_upload_queue()
except Exception:
pass
def _register_shutdown_handler():
"""Connect the upload queue shutdown to application exit."""
try:
import atexit
atexit.register(_shutdown_upload_queue)
except Exception as e:
FreeCAD.Console.PrintLog(f"Silo: shutdown handler skipped: {e}\n")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Deferred setup — staggered timers for non-blocking startup # Deferred setup — staggered timers for non-blocking startup
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -259,6 +306,8 @@ from PySide import QtCore as _QtCore
_QtCore.QTimer.singleShot(500, _handle_startup_urls) _QtCore.QTimer.singleShot(500, _handle_startup_urls)
_QtCore.QTimer.singleShot(600, _register_silo_document_observer) _QtCore.QTimer.singleShot(600, _register_silo_document_observer)
_QtCore.QTimer.singleShot(700, _setup_upload_queue)
_QtCore.QTimer.singleShot(800, _register_shutdown_handler)
_QtCore.QTimer.singleShot(2000, _setup_silo_auth_panel) _QtCore.QTimer.singleShot(2000, _setup_silo_auth_panel)
_QtCore.QTimer.singleShot(2500, _register_silo_overlay) _QtCore.QTimer.singleShot(2500, _register_silo_overlay)
_QtCore.QTimer.singleShot(3000, _check_silo_first_start) _QtCore.QTimer.singleShot(3000, _check_silo_first_start)

View File

@@ -920,6 +920,67 @@ def _push_bom_after_upload(doc, part_number, revision_number):
FreeCAD.Console.PrintWarning(f"BOM sync failed: {e}\n") FreeCAD.Console.PrintWarning(f"BOM sync failed: {e}\n")
def _extract_dag_data(doc):
"""Pre-extract DAG data on the main thread for async upload.
Returns ``(nodes, edges)`` or ``None`` if extraction fails or
the document has no meaningful DAG.
"""
try:
from dag import extract_dag
nodes, edges = extract_dag(doc)
return (nodes, edges) if nodes else None
except Exception:
return None
def _extract_bom_data(doc):
"""Pre-extract BOM entries on the main thread for async upload.
Returns a list of resolved :class:`bom_sync.BomEntry` objects
(with ``part_number`` populated) or ``None`` if the document is
not an assembly or has no cross-document links.
Resolution calls ``_client.get_item_by_uuid()`` which is a fast
DB lookup. This must run on the main thread because it accesses
``App.Document`` objects to read ``SiloItemId`` properties.
"""
try:
from bom_sync import extract_bom_entries, resolve_entries
entries = extract_bom_entries(doc)
if not entries:
return None
resolved, _unresolved = resolve_entries(entries, _client)
return resolved if resolved else None
except Exception:
return None
def _enqueue_upload(doc, part_number, file_path, properties, comment=""):
"""Capture all upload data on the main thread and enqueue for async upload.
The caller must have already saved the file to *file_path* and
collected *properties* synchronously.
"""
from silo_upload_queue import UploadTask, get_upload_queue
dag_data = _extract_dag_data(doc)
bom_data = _extract_bom_data(doc)
task = UploadTask(
doc_name=doc.Name,
part_number=part_number,
file_path=str(file_path),
properties=properties,
comment=comment,
dag_data=dag_data,
bom_data=bom_data,
)
get_upload_queue().enqueue(task)
class Silo_Save: class Silo_Save:
"""Save locally and upload to the server.""" """Save locally and upload to the server."""
@@ -981,22 +1042,8 @@ class Silo_Save:
FreeCAD.Console.PrintMessage(f"Saved: {file_path}\n") FreeCAD.Console.PrintMessage(f"Saved: {file_path}\n")
# Try to upload to server # Enqueue async upload (non-blocking)
try: _enqueue_upload(doc, part_number, file_path, properties, comment="Auto-save")
result = _client._upload_file(
part_number, str(file_path), properties, "Auto-save"
)
new_rev = result["revision_number"]
FreeCAD.Console.PrintMessage(f"Uploaded as revision {new_rev}\n")
_push_dag_after_upload(doc, part_number, new_rev)
_push_bom_after_upload(doc, part_number, new_rev)
_update_manifest_revision(str(file_path), new_rev)
except Exception as e:
FreeCAD.Console.PrintWarning(f"Upload failed: {e}\n")
FreeCAD.Console.PrintMessage("File saved locally but not uploaded.\n")
def IsActive(self): def IsActive(self):
return FreeCAD.ActiveDocument is not None and _server_mode == "normal" return FreeCAD.ActiveDocument is not None and _server_mode == "normal"
@@ -1036,24 +1083,23 @@ class Silo_Commit:
# Collect properties BEFORE saving to avoid dirtying the document # Collect properties BEFORE saving to avoid dirtying the document
properties = collect_document_properties(doc) properties = collect_document_properties(doc)
try:
file_path = _sync.save_to_canonical_path(doc, force_rename=True) file_path = _sync.save_to_canonical_path(doc, force_rename=True)
if not file_path: if not file_path:
FreeCAD.Console.PrintError("Could not determine save path\n")
return return
result = _client._upload_file( # Clear modified flag — data is safe on disk
part_number, str(file_path), properties, comment gui_doc = FreeCADGui.getDocument(doc.Name)
) if gui_doc and gui_doc.Modified:
try:
gui_doc.Modified = False
except Exception:
pass
new_rev = result["revision_number"] FreeCAD.Console.PrintMessage(f"Saved: {file_path}\n")
FreeCAD.Console.PrintMessage(f"Committed revision {new_rev}: {comment}\n")
_push_dag_after_upload(doc, part_number, new_rev) # Enqueue async upload (non-blocking)
_push_bom_after_upload(doc, part_number, new_rev) _enqueue_upload(doc, part_number, file_path, properties, comment=comment)
_update_manifest_revision(str(file_path), new_rev)
except Exception as e:
FreeCAD.Console.PrintError(f"Commit failed: {e}\n")
def IsActive(self): def IsActive(self):
return FreeCAD.ActiveDocument is not None and _server_mode == "normal" return FreeCAD.ActiveDocument is not None and _server_mode == "normal"

View File

@@ -357,16 +357,17 @@ class SiloOrigin:
return None return None
def saveDocument(self, doc) -> bool: def saveDocument(self, doc) -> bool:
"""Save document and sync to Silo. """Save document locally and enqueue async upload to Silo.
Saves the document locally to the canonical path and uploads The local save is synchronous — the user's data is safe on disk
to Silo for sync. before this method returns. The network upload (file, DAG, BOM)
runs in a background thread via the upload queue.
Args: Args:
doc: FreeCAD App.Document doc: FreeCAD App.Document
Returns: Returns:
True if save succeeded True if the local save succeeded
""" """
if not doc: if not doc:
return False return False
@@ -380,21 +381,25 @@ class SiloOrigin:
return False return False
try: try:
# Save to canonical path # 1. Synchronous: save to disk
file_path = _sync.save_to_canonical_path(doc) file_path = _sync.save_to_canonical_path(doc)
if not file_path: if not file_path:
FreeCAD.Console.PrintError("Failed to save to canonical path\n") FreeCAD.Console.PrintError("Failed to save to canonical path\n")
return False return False
# Upload to Silo # 2. Synchronous: collect all data needed for upload
properties = collect_document_properties(doc) properties = collect_document_properties(doc)
_client._upload_file(obj.SiloPartNumber, str(file_path), properties, comment="")
# Clear modified flag (Modified is on Gui.Document, not App.Document) # 3. Synchronous: clear modified flag (data is safe on disk)
gui_doc = FreeCADGui.getDocument(doc.Name) gui_doc = FreeCADGui.getDocument(doc.Name)
if gui_doc: if gui_doc:
gui_doc.Modified = False gui_doc.Modified = False
# 4. Async: enqueue upload (non-blocking)
from silo_commands import _enqueue_upload
_enqueue_upload(doc, obj.SiloPartNumber, file_path, properties, comment="")
return True return True
except Exception as e: except Exception as e:
import traceback import traceback

View File

@@ -0,0 +1,61 @@
"""Status bar widget for the Silo upload queue.
Shows sync progress in the main window status bar:
- Hidden when idle
- "Syncing PN-001..." during upload
- "Synced PN-001 r3" briefly on success
- "Sync failed: PN-001" in red on failure
"""
from PySide6.QtCore import QTimer
from PySide6.QtWidgets import QLabel
class SyncStatusWidget(QLabel):
"""Compact status bar indicator for background Silo uploads."""
_FADE_MS = 4000 # how long the success message stays visible
def __init__(self, queue, parent=None):
super().__init__(parent)
self.setVisible(False)
self._queue = queue
self._fade_timer = QTimer(self)
self._fade_timer.setSingleShot(True)
self._fade_timer.timeout.connect(self._hide)
queue.upload_started.connect(self._on_started)
queue.upload_finished.connect(self._on_finished)
queue.upload_failed.connect(self._on_failed)
# -- slots ---------------------------------------------------------------
def _on_started(self, doc_name: str, part_number: str) -> None:
pending = self._queue.pending_count
if pending > 0:
self.setText(f"Syncing {part_number} (+{pending} queued)...")
else:
self.setText(f"Syncing {part_number}...")
self.setStyleSheet("")
self._fade_timer.stop()
self.setVisible(True)
def _on_finished(self, doc_name: str, part_number: str, revision: int) -> None:
pending = self._queue.pending_count
if pending > 0:
self.setText(f"Synced {part_number} r{revision} ({pending} remaining)")
else:
self.setText(f"Synced {part_number} r{revision}")
self.setStyleSheet("")
self._fade_timer.start(self._FADE_MS)
def _on_failed(self, doc_name: str, part_number: str, error: str) -> None:
self.setText(f"Sync failed: {part_number}")
self.setToolTip(error)
self.setStyleSheet("color: #f38ba8;") # Catppuccin Mocha red
self._fade_timer.stop()
self.setVisible(True)
def _hide(self) -> None:
self.setVisible(False)
self.setToolTip("")

View File

@@ -0,0 +1,250 @@
"""Async upload queue for Silo PLM.
Provides a background ``QThread`` that processes file uploads, DAG syncs,
and BOM syncs without blocking the main UI thread. All data required for
an upload is captured on the main thread before enqueuing — the worker
never accesses ``App.Document``, ``Gui.Document``, or any Qt widget.
Typical usage from ``SiloOrigin.saveDocument``::
task = UploadTask(
doc_name=doc.Name,
part_number=obj.SiloPartNumber,
file_path=str(saved_path),
properties=collected_props,
comment="",
dag_data=extracted_dag,
bom_data=extracted_bom,
)
get_upload_queue().enqueue(task)
"""
import collections
import dataclasses
import threading
import time
import FreeCAD
from PySide6.QtCore import QThread, Signal
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclasses.dataclass
class UploadTask:
"""Immutable snapshot of everything needed to upload a document revision.
All fields are plain Python types — no FreeCAD or Qt objects.
"""
doc_name: str
"""FreeCAD document internal name (used for coalescing)."""
part_number: str
"""Silo part number."""
file_path: str
"""Absolute path to the saved ``.kc`` file on disk."""
properties: dict
"""Pre-collected document properties dict."""
comment: str = ""
"""Revision comment (empty for auto-save)."""
dag_data: tuple | None = None
"""Pre-extracted ``(nodes, edges)`` or ``None``."""
bom_data: object | None = None
"""Pre-extracted BOM payload or ``None``."""
timestamp: float = dataclasses.field(default_factory=time.time)
"""Time the task was created (``time.time()``)."""
# ---------------------------------------------------------------------------
# Worker thread
# ---------------------------------------------------------------------------
class UploadWorker(QThread):
"""Background thread that processes :class:`UploadTask` items serially.
Signals are emitted on completion so the main thread can update the UI.
The worker never touches FreeCAD documents or Qt widgets.
"""
upload_started = Signal(str, str) # doc_name, part_number
upload_finished = Signal(str, str, int) # doc_name, part_number, revision
upload_failed = Signal(str, str, str) # doc_name, part_number, error
queue_empty = Signal()
def __init__(self, parent=None):
super().__init__(parent)
self._queue: collections.deque[UploadTask] = collections.deque()
self._lock = threading.Lock()
self._event = threading.Event()
self._stop = False
# -- public API (called from main thread) --------------------------------
def enqueue(self, task: UploadTask) -> None:
"""Add a task, coalescing any pending task for the same document."""
with self._lock:
self._queue = collections.deque(t for t in self._queue if t.doc_name != task.doc_name)
self._queue.append(task)
self._event.set()
def cancel(self, doc_name: str) -> None:
"""Remove any pending (not yet started) task for *doc_name*."""
with self._lock:
self._queue = collections.deque(t for t in self._queue if t.doc_name != doc_name)
@property
def pending_count(self) -> int:
"""Number of tasks waiting to be processed."""
with self._lock:
return len(self._queue)
def shutdown(self) -> None:
"""Signal the worker to stop and wait up to 5 s for it to finish."""
self._stop = True
self._event.set()
self.wait(5000)
# -- thread entry point --------------------------------------------------
def run(self) -> None: # noqa: D401 — Qt override
"""Process tasks until :meth:`shutdown` is called."""
from silo_commands import _client, _update_manifest_revision
while not self._stop:
self._event.wait(timeout=1.0)
self._event.clear()
while not self._stop:
task = self._pop_task()
if task is None:
break
self._process(task, _client)
if not self._stop and self.pending_count == 0:
self.queue_empty.emit()
# -- internals -----------------------------------------------------------
def _pop_task(self) -> UploadTask | None:
with self._lock:
return self._queue.popleft() if self._queue else None
def _process(self, task: UploadTask, client) -> None:
self.upload_started.emit(task.doc_name, task.part_number)
try:
result = client._upload_file(
task.part_number, task.file_path, task.properties, task.comment
)
rev = result.get("revision_number", 0)
# DAG sync (non-critical)
if task.dag_data:
try:
nodes, edges = task.dag_data
client.push_dag(task.part_number, rev, nodes, edges)
except Exception as exc:
FreeCAD.Console.PrintWarning(
f"[upload-queue] DAG sync failed for {task.part_number}: {exc}\n"
)
# BOM sync (non-critical)
if task.bom_data:
try:
_push_bom_from_extracted(task.part_number, task.bom_data, client)
except Exception as exc:
FreeCAD.Console.PrintWarning(
f"[upload-queue] BOM sync failed for {task.part_number}: {exc}\n"
)
# Manifest revision update (local file I/O, safe from worker)
try:
from silo_commands import _update_manifest_revision
_update_manifest_revision(task.file_path, rev)
except Exception:
pass
self.upload_finished.emit(task.doc_name, task.part_number, rev)
FreeCAD.Console.PrintMessage(
f"[upload-queue] Uploaded {task.part_number} as revision {rev}\n"
)
except Exception as exc:
self.upload_failed.emit(task.doc_name, task.part_number, str(exc))
FreeCAD.Console.PrintWarning(
f"[upload-queue] Upload failed for {task.part_number}: {exc}\n"
)
def _push_bom_from_extracted(part_number: str, bom_data, client) -> None:
"""Diff and apply pre-extracted BOM entries against the server.
*bom_data* is a list of resolved ``bom_sync.BomEntry`` dataclasses
(with ``part_number`` populated) produced by
:func:`silo_commands._extract_bom_data` on the main thread.
This function fetches the current server BOM, diffs it against the
local entries, and applies adds/quantity updates. No FreeCAD
document access is needed — safe to call from the worker thread.
"""
if not bom_data:
return
from bom_sync import apply_bom_diff, diff_bom
try:
remote = client.get_bom(part_number)
except Exception:
remote = []
diff = diff_bom(bom_data, remote)
result = apply_bom_diff(diff, part_number, client)
parts = []
if result.added_count:
parts.append(f"+{result.added_count} added")
if result.updated_count:
parts.append(f"~{result.updated_count} qty updated")
if parts:
FreeCAD.Console.PrintMessage(
f"[upload-queue] BOM synced for {part_number}: {', '.join(parts)}\n"
)
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_upload_queue: UploadWorker | None = None
def get_upload_queue() -> UploadWorker:
"""Return the global upload queue, creating it on first call."""
global _upload_queue
if _upload_queue is None:
_upload_queue = UploadWorker()
_upload_queue.start()
return _upload_queue
def shutdown_upload_queue() -> None:
"""Stop the global upload queue if running."""
global _upload_queue
if _upload_queue is not None:
count = _upload_queue.pending_count
if count:
FreeCAD.Console.PrintWarning(
f"[upload-queue] Waiting for {count} pending upload(s)...\n"
)
_upload_queue.shutdown()
_upload_queue = None