Compare commits

...

1 Commits

Author SHA1 Message Date
c5f00219fa feat(silo): async save queue — background upload with coalescing (#392)
Local save is synchronous (data safe on disk before returning).
Network upload (file, DAG, BOM) runs in a background QThread.

New files:
- silo_upload_queue.py: UploadTask dataclass + UploadWorker(QThread)
  with coalescing, cancel, shutdown, and Qt signals
- silo_status_widget.py: SyncStatusWidget for status bar feedback

Modified:
- silo_commands.py: _extract_dag_data, _extract_bom_data, _enqueue_upload
  helpers; Silo_Save and Silo_Commit use async enqueue
- silo_origin.py: saveDocument() saves locally then enqueues async upload
- InitGui.py: upload queue init at 700ms, shutdown handler via atexit

Thread safety: worker never accesses App.Document or Qt widgets;
all data pre-extracted on main thread as plain dicts/strings.
Queue uses threading.Lock; signals cross thread boundary via Qt.
2026-03-05 11:08:44 -06:00
5 changed files with 450 additions and 39 deletions

View File

@@ -251,6 +251,53 @@ def _handle_startup_urls():
handle_kindred_url(arg)
# ---------------------------------------------------------------------------
# Upload queue — background thread for async file uploads
# ---------------------------------------------------------------------------
def _setup_upload_queue():
"""Start the upload queue and register the status bar widget."""
try:
from silo_upload_queue import get_upload_queue
queue = get_upload_queue()
# Status bar widget
try:
from silo_status_widget import SyncStatusWidget
mw = FreeCADGui.getMainWindow()
if mw and mw.statusBar():
widget = SyncStatusWidget(queue, parent=mw.statusBar())
mw.statusBar().addPermanentWidget(widget)
except Exception as e:
FreeCAD.Console.PrintLog(f"Silo: status widget skipped: {e}\n")
except Exception as e:
FreeCAD.Console.PrintLog(f"Silo: upload queue skipped: {e}\n")
def _shutdown_upload_queue():
"""Drain the upload queue on application exit."""
try:
from silo_upload_queue import shutdown_upload_queue
shutdown_upload_queue()
except Exception:
pass
def _register_shutdown_handler():
"""Connect the upload queue shutdown to application exit."""
try:
import atexit
atexit.register(_shutdown_upload_queue)
except Exception as e:
FreeCAD.Console.PrintLog(f"Silo: shutdown handler skipped: {e}\n")
# ---------------------------------------------------------------------------
# Deferred setup — staggered timers for non-blocking startup
# ---------------------------------------------------------------------------
@@ -259,6 +306,8 @@ from PySide import QtCore as _QtCore
_QtCore.QTimer.singleShot(500, _handle_startup_urls)
_QtCore.QTimer.singleShot(600, _register_silo_document_observer)
_QtCore.QTimer.singleShot(700, _setup_upload_queue)
_QtCore.QTimer.singleShot(800, _register_shutdown_handler)
_QtCore.QTimer.singleShot(2000, _setup_silo_auth_panel)
_QtCore.QTimer.singleShot(2500, _register_silo_overlay)
_QtCore.QTimer.singleShot(3000, _check_silo_first_start)

View File

@@ -920,6 +920,67 @@ def _push_bom_after_upload(doc, part_number, revision_number):
FreeCAD.Console.PrintWarning(f"BOM sync failed: {e}\n")
def _extract_dag_data(doc):
"""Pre-extract DAG data on the main thread for async upload.
Returns ``(nodes, edges)`` or ``None`` if extraction fails or
the document has no meaningful DAG.
"""
try:
from dag import extract_dag
nodes, edges = extract_dag(doc)
return (nodes, edges) if nodes else None
except Exception:
return None
def _extract_bom_data(doc):
"""Pre-extract BOM entries on the main thread for async upload.
Returns a list of resolved :class:`bom_sync.BomEntry` objects
(with ``part_number`` populated) or ``None`` if the document is
not an assembly or has no cross-document links.
Resolution calls ``_client.get_item_by_uuid()`` which is a fast
DB lookup. This must run on the main thread because it accesses
``App.Document`` objects to read ``SiloItemId`` properties.
"""
try:
from bom_sync import extract_bom_entries, resolve_entries
entries = extract_bom_entries(doc)
if not entries:
return None
resolved, _unresolved = resolve_entries(entries, _client)
return resolved if resolved else None
except Exception:
return None
def _enqueue_upload(doc, part_number, file_path, properties, comment=""):
"""Capture all upload data on the main thread and enqueue for async upload.
The caller must have already saved the file to *file_path* and
collected *properties* synchronously.
"""
from silo_upload_queue import UploadTask, get_upload_queue
dag_data = _extract_dag_data(doc)
bom_data = _extract_bom_data(doc)
task = UploadTask(
doc_name=doc.Name,
part_number=part_number,
file_path=str(file_path),
properties=properties,
comment=comment,
dag_data=dag_data,
bom_data=bom_data,
)
get_upload_queue().enqueue(task)
class Silo_Save:
"""Save locally and upload to the server."""
@@ -981,22 +1042,8 @@ class Silo_Save:
FreeCAD.Console.PrintMessage(f"Saved: {file_path}\n")
# Try to upload to server
try:
result = _client._upload_file(
part_number, str(file_path), properties, "Auto-save"
)
new_rev = result["revision_number"]
FreeCAD.Console.PrintMessage(f"Uploaded as revision {new_rev}\n")
_push_dag_after_upload(doc, part_number, new_rev)
_push_bom_after_upload(doc, part_number, new_rev)
_update_manifest_revision(str(file_path), new_rev)
except Exception as e:
FreeCAD.Console.PrintWarning(f"Upload failed: {e}\n")
FreeCAD.Console.PrintMessage("File saved locally but not uploaded.\n")
# Enqueue async upload (non-blocking)
_enqueue_upload(doc, part_number, file_path, properties, comment="Auto-save")
def IsActive(self):
return FreeCAD.ActiveDocument is not None and _server_mode == "normal"
@@ -1036,24 +1083,23 @@ class Silo_Commit:
# Collect properties BEFORE saving to avoid dirtying the document
properties = collect_document_properties(doc)
try:
file_path = _sync.save_to_canonical_path(doc, force_rename=True)
if not file_path:
return
file_path = _sync.save_to_canonical_path(doc, force_rename=True)
if not file_path:
FreeCAD.Console.PrintError("Could not determine save path\n")
return
result = _client._upload_file(
part_number, str(file_path), properties, comment
)
# Clear modified flag — data is safe on disk
gui_doc = FreeCADGui.getDocument(doc.Name)
if gui_doc and gui_doc.Modified:
try:
gui_doc.Modified = False
except Exception:
pass
new_rev = result["revision_number"]
FreeCAD.Console.PrintMessage(f"Committed revision {new_rev}: {comment}\n")
FreeCAD.Console.PrintMessage(f"Saved: {file_path}\n")
_push_dag_after_upload(doc, part_number, new_rev)
_push_bom_after_upload(doc, part_number, new_rev)
_update_manifest_revision(str(file_path), new_rev)
except Exception as e:
FreeCAD.Console.PrintError(f"Commit failed: {e}\n")
# Enqueue async upload (non-blocking)
_enqueue_upload(doc, part_number, file_path, properties, comment=comment)
def IsActive(self):
return FreeCAD.ActiveDocument is not None and _server_mode == "normal"

View File

@@ -357,16 +357,17 @@ class SiloOrigin:
return None
def saveDocument(self, doc) -> bool:
"""Save document and sync to Silo.
"""Save document locally and enqueue async upload to Silo.
Saves the document locally to the canonical path and uploads
to Silo for sync.
The local save is synchronous — the user's data is safe on disk
before this method returns. The network upload (file, DAG, BOM)
runs in a background thread via the upload queue.
Args:
doc: FreeCAD App.Document
Returns:
True if save succeeded
True if the local save succeeded
"""
if not doc:
return False
@@ -380,21 +381,25 @@ class SiloOrigin:
return False
try:
# Save to canonical path
# 1. Synchronous: save to disk
file_path = _sync.save_to_canonical_path(doc)
if not file_path:
FreeCAD.Console.PrintError("Failed to save to canonical path\n")
return False
# Upload to Silo
# 2. Synchronous: collect all data needed for upload
properties = collect_document_properties(doc)
_client._upload_file(obj.SiloPartNumber, str(file_path), properties, comment="")
# Clear modified flag (Modified is on Gui.Document, not App.Document)
# 3. Synchronous: clear modified flag (data is safe on disk)
gui_doc = FreeCADGui.getDocument(doc.Name)
if gui_doc:
gui_doc.Modified = False
# 4. Async: enqueue upload (non-blocking)
from silo_commands import _enqueue_upload
_enqueue_upload(doc, obj.SiloPartNumber, file_path, properties, comment="")
return True
except Exception as e:
import traceback

View File

@@ -0,0 +1,61 @@
"""Status bar widget for the Silo upload queue.
Shows sync progress in the main window status bar:
- Hidden when idle
- "Syncing PN-001..." during upload
- "Synced PN-001 r3" briefly on success
- "Sync failed: PN-001" in red on failure
"""
from PySide6.QtCore import QTimer
from PySide6.QtWidgets import QLabel
class SyncStatusWidget(QLabel):
"""Compact status bar indicator for background Silo uploads."""
_FADE_MS = 4000 # how long the success message stays visible
def __init__(self, queue, parent=None):
super().__init__(parent)
self.setVisible(False)
self._queue = queue
self._fade_timer = QTimer(self)
self._fade_timer.setSingleShot(True)
self._fade_timer.timeout.connect(self._hide)
queue.upload_started.connect(self._on_started)
queue.upload_finished.connect(self._on_finished)
queue.upload_failed.connect(self._on_failed)
# -- slots ---------------------------------------------------------------
def _on_started(self, doc_name: str, part_number: str) -> None:
pending = self._queue.pending_count
if pending > 0:
self.setText(f"Syncing {part_number} (+{pending} queued)...")
else:
self.setText(f"Syncing {part_number}...")
self.setStyleSheet("")
self._fade_timer.stop()
self.setVisible(True)
def _on_finished(self, doc_name: str, part_number: str, revision: int) -> None:
pending = self._queue.pending_count
if pending > 0:
self.setText(f"Synced {part_number} r{revision} ({pending} remaining)")
else:
self.setText(f"Synced {part_number} r{revision}")
self.setStyleSheet("")
self._fade_timer.start(self._FADE_MS)
def _on_failed(self, doc_name: str, part_number: str, error: str) -> None:
self.setText(f"Sync failed: {part_number}")
self.setToolTip(error)
self.setStyleSheet("color: #f38ba8;") # Catppuccin Mocha red
self._fade_timer.stop()
self.setVisible(True)
def _hide(self) -> None:
self.setVisible(False)
self.setToolTip("")

View File

@@ -0,0 +1,250 @@
"""Async upload queue for Silo PLM.
Provides a background ``QThread`` that processes file uploads, DAG syncs,
and BOM syncs without blocking the main UI thread. All data required for
an upload is captured on the main thread before enqueuing — the worker
never accesses ``App.Document``, ``Gui.Document``, or any Qt widget.
Typical usage from ``SiloOrigin.saveDocument``::
task = UploadTask(
doc_name=doc.Name,
part_number=obj.SiloPartNumber,
file_path=str(saved_path),
properties=collected_props,
comment="",
dag_data=extracted_dag,
bom_data=extracted_bom,
)
get_upload_queue().enqueue(task)
"""
import collections
import dataclasses
import threading
import time
import FreeCAD
from PySide6.QtCore import QThread, Signal
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclasses.dataclass
class UploadTask:
"""Immutable snapshot of everything needed to upload a document revision.
All fields are plain Python types — no FreeCAD or Qt objects.
"""
doc_name: str
"""FreeCAD document internal name (used for coalescing)."""
part_number: str
"""Silo part number."""
file_path: str
"""Absolute path to the saved ``.kc`` file on disk."""
properties: dict
"""Pre-collected document properties dict."""
comment: str = ""
"""Revision comment (empty for auto-save)."""
dag_data: tuple | None = None
"""Pre-extracted ``(nodes, edges)`` or ``None``."""
bom_data: object | None = None
"""Pre-extracted BOM payload or ``None``."""
timestamp: float = dataclasses.field(default_factory=time.time)
"""Time the task was created (``time.time()``)."""
# ---------------------------------------------------------------------------
# Worker thread
# ---------------------------------------------------------------------------
class UploadWorker(QThread):
"""Background thread that processes :class:`UploadTask` items serially.
Signals are emitted on completion so the main thread can update the UI.
The worker never touches FreeCAD documents or Qt widgets.
"""
upload_started = Signal(str, str) # doc_name, part_number
upload_finished = Signal(str, str, int) # doc_name, part_number, revision
upload_failed = Signal(str, str, str) # doc_name, part_number, error
queue_empty = Signal()
def __init__(self, parent=None):
super().__init__(parent)
self._queue: collections.deque[UploadTask] = collections.deque()
self._lock = threading.Lock()
self._event = threading.Event()
self._stop = False
# -- public API (called from main thread) --------------------------------
def enqueue(self, task: UploadTask) -> None:
"""Add a task, coalescing any pending task for the same document."""
with self._lock:
self._queue = collections.deque(t for t in self._queue if t.doc_name != task.doc_name)
self._queue.append(task)
self._event.set()
def cancel(self, doc_name: str) -> None:
"""Remove any pending (not yet started) task for *doc_name*."""
with self._lock:
self._queue = collections.deque(t for t in self._queue if t.doc_name != doc_name)
@property
def pending_count(self) -> int:
"""Number of tasks waiting to be processed."""
with self._lock:
return len(self._queue)
def shutdown(self) -> None:
"""Signal the worker to stop and wait up to 5 s for it to finish."""
self._stop = True
self._event.set()
self.wait(5000)
# -- thread entry point --------------------------------------------------
def run(self) -> None: # noqa: D401 — Qt override
"""Process tasks until :meth:`shutdown` is called."""
from silo_commands import _client, _update_manifest_revision
while not self._stop:
self._event.wait(timeout=1.0)
self._event.clear()
while not self._stop:
task = self._pop_task()
if task is None:
break
self._process(task, _client)
if not self._stop and self.pending_count == 0:
self.queue_empty.emit()
# -- internals -----------------------------------------------------------
def _pop_task(self) -> UploadTask | None:
with self._lock:
return self._queue.popleft() if self._queue else None
def _process(self, task: UploadTask, client) -> None:
self.upload_started.emit(task.doc_name, task.part_number)
try:
result = client._upload_file(
task.part_number, task.file_path, task.properties, task.comment
)
rev = result.get("revision_number", 0)
# DAG sync (non-critical)
if task.dag_data:
try:
nodes, edges = task.dag_data
client.push_dag(task.part_number, rev, nodes, edges)
except Exception as exc:
FreeCAD.Console.PrintWarning(
f"[upload-queue] DAG sync failed for {task.part_number}: {exc}\n"
)
# BOM sync (non-critical)
if task.bom_data:
try:
_push_bom_from_extracted(task.part_number, task.bom_data, client)
except Exception as exc:
FreeCAD.Console.PrintWarning(
f"[upload-queue] BOM sync failed for {task.part_number}: {exc}\n"
)
# Manifest revision update (local file I/O, safe from worker)
try:
from silo_commands import _update_manifest_revision
_update_manifest_revision(task.file_path, rev)
except Exception:
pass
self.upload_finished.emit(task.doc_name, task.part_number, rev)
FreeCAD.Console.PrintMessage(
f"[upload-queue] Uploaded {task.part_number} as revision {rev}\n"
)
except Exception as exc:
self.upload_failed.emit(task.doc_name, task.part_number, str(exc))
FreeCAD.Console.PrintWarning(
f"[upload-queue] Upload failed for {task.part_number}: {exc}\n"
)
def _push_bom_from_extracted(part_number: str, bom_data, client) -> None:
"""Diff and apply pre-extracted BOM entries against the server.
*bom_data* is a list of resolved ``bom_sync.BomEntry`` dataclasses
(with ``part_number`` populated) produced by
:func:`silo_commands._extract_bom_data` on the main thread.
This function fetches the current server BOM, diffs it against the
local entries, and applies adds/quantity updates. No FreeCAD
document access is needed — safe to call from the worker thread.
"""
if not bom_data:
return
from bom_sync import apply_bom_diff, diff_bom
try:
remote = client.get_bom(part_number)
except Exception:
remote = []
diff = diff_bom(bom_data, remote)
result = apply_bom_diff(diff, part_number, client)
parts = []
if result.added_count:
parts.append(f"+{result.added_count} added")
if result.updated_count:
parts.append(f"~{result.updated_count} qty updated")
if parts:
FreeCAD.Console.PrintMessage(
f"[upload-queue] BOM synced for {part_number}: {', '.join(parts)}\n"
)
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_upload_queue: UploadWorker | None = None
def get_upload_queue() -> UploadWorker:
"""Return the global upload queue, creating it on first call."""
global _upload_queue
if _upload_queue is None:
_upload_queue = UploadWorker()
_upload_queue.start()
return _upload_queue
def shutdown_upload_queue() -> None:
"""Stop the global upload queue if running."""
global _upload_queue
if _upload_queue is not None:
count = _upload_queue.pending_count
if count:
FreeCAD.Console.PrintWarning(
f"[upload-queue] Waiting for {count} pending upload(s)...\n"
)
_upload_queue.shutdown()
_upload_queue = None