"""Silo FreeCAD commands - Streamlined workflow for CAD file management.""" import json import os import re import socket import urllib.error import urllib.parse import urllib.request from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import FreeCAD import FreeCADGui from PySide import QtCore from silo_client import SiloClient, SiloSettings # Preference group for Kindred Silo settings _PREF_GROUP = "User parameter:BaseApp/Preferences/Mod/KindredSilo" # Configuration - preferences take priority over env vars SILO_PROJECTS_DIR = os.environ.get( "SILO_PROJECTS_DIR", os.path.expanduser("~/projects") ) # --------------------------------------------------------------------------- # Local utility helpers (previously in silo_client, now server-driven) # --------------------------------------------------------------------------- def _parse_part_number(part_number: str) -> Tuple[str, str]: """Parse part number into ``(category, sequence)``. E.g. ``"F01-0001"`` -> ``("F01", "0001")``.""" parts = part_number.split("-") if len(parts) >= 2: return parts[0], parts[1] return part_number, "" def _sanitize_filename(name: str) -> str: """Sanitize a string for use in filenames.""" sanitized = re.sub(r'[<>:"/\\|?*]', "_", name) sanitized = re.sub(r"[\s_]+", "_", sanitized) sanitized = sanitized.strip("_ ") return sanitized[:50] def _relative_time(dt): """Format a datetime as a human-friendly relative string.""" now = datetime.now() diff = now - dt seconds = int(diff.total_seconds()) if seconds < 60: return "just now" minutes = seconds // 60 if minutes < 60: return f"{minutes}m ago" hours = minutes // 60 if hours < 24: return f"{hours}h ago" days = hours // 24 if days < 30: return f"{days}d ago" return dt.strftime("%Y-%m-%d") # --------------------------------------------------------------------------- # FreeCAD settings adapter # --------------------------------------------------------------------------- class FreeCADSiloSettings(SiloSettings): """SiloSettings backed by FreeCAD preferences.""" def get_api_url(self) -> str: param = FreeCAD.ParamGet(_PREF_GROUP) url = param.GetString("ApiUrl", "") if not url: url = os.environ.get("SILO_API_URL", "http://localhost:8080/api") url = url.rstrip("/") if url: parsed = urllib.parse.urlparse(url) if not parsed.path or parsed.path == "/": url = url + "/api" return url def get_api_token(self) -> str: param = FreeCAD.ParamGet(_PREF_GROUP) token = param.GetString("ApiToken", "") if not token: token = os.environ.get("SILO_API_TOKEN", "") return token def get_ssl_verify(self) -> bool: param = FreeCAD.ParamGet(_PREF_GROUP) return param.GetBool("SslVerify", True) def get_ssl_cert_path(self) -> str: param = FreeCAD.ParamGet(_PREF_GROUP) return param.GetString("SslCertPath", "") def save_auth( self, username: str, role: str = "", source: str = "", token: str = "" ): param = FreeCAD.ParamGet(_PREF_GROUP) param.SetString("AuthUsername", username) param.SetString("AuthRole", role) param.SetString("AuthSource", source) if token: param.SetString("ApiToken", token) def get_schema_name(self) -> str: param = FreeCAD.ParamGet(_PREF_GROUP) name = param.GetString("SchemaName", "") if not name: name = os.environ.get("SILO_SCHEMA", "kindred-rd") return name def clear_auth(self): param = FreeCAD.ParamGet(_PREF_GROUP) param.SetString("ApiToken", "") param.SetString("AuthUsername", "") param.SetString("AuthRole", "") param.SetString("AuthSource", "") _fc_settings = FreeCADSiloSettings() # --------------------------------------------------------------------------- # Auth info helpers (read from preferences for UI display) # --------------------------------------------------------------------------- def _get_auth_username() -> str: param = FreeCAD.ParamGet(_PREF_GROUP) return param.GetString("AuthUsername", "") def _get_auth_role() -> str: param = FreeCAD.ParamGet(_PREF_GROUP) return param.GetString("AuthRole", "") def _get_auth_source() -> str: param = FreeCAD.ParamGet(_PREF_GROUP) return param.GetString("AuthSource", "") def _get_auth_token() -> str: return _fc_settings.get_api_token() # Thin wrappers so command classes can call these without refactoring. # They delegate to the settings adapter or the shared client. def _get_api_url() -> str: return _fc_settings.get_api_url() def _get_schema_name() -> str: return _fc_settings.get_schema_name() def _get_ssl_verify() -> bool: return _fc_settings.get_ssl_verify() def _get_ssl_context(): from silo_client._ssl import build_ssl_context return build_ssl_context( _fc_settings.get_ssl_verify(), _fc_settings.get_ssl_cert_path() ) def _get_auth_headers() -> Dict[str, str]: token = _fc_settings.get_api_token() if not token: token = os.environ.get("SILO_API_TOKEN", "") if token: return {"Authorization": f"Bearer {token}"} return {} def _save_auth_info(username: str, role: str = "", source: str = "", token: str = ""): _fc_settings.save_auth(username, role, source, token) def _clear_auth(): _fc_settings.clear_auth() # --------------------------------------------------------------------------- # Server mode tracking # --------------------------------------------------------------------------- _server_mode = "offline" # "normal" | "read-only" | "offline" def _fetch_server_mode() -> str: """Fetch server mode from the /ready endpoint. Returns one of: "normal", "read-only", "offline". """ api_url = _get_api_url().rstrip("/") base_url = api_url[:-4] if api_url.endswith("/api") else api_url url = f"{base_url}/ready" try: req = urllib.request.Request(url, method="GET") resp = urllib.request.urlopen(req, context=_get_ssl_context(), timeout=10) body = resp.read(4096).decode("utf-8", errors="replace") data = json.loads(body) status = data.get("status", "") if status in ("ok", "ready"): return "normal" if status in ("read-only", "read_only", "readonly"): return "read-only" # Unknown status but server responded — treat as normal return "normal" except Exception: return "offline" # --------------------------------------------------------------------------- # Icon helper # --------------------------------------------------------------------------- _ICON_DIR = os.path.join( os.path.dirname(os.path.abspath(__file__)), "resources", "icons" ) def _icon(name): """Get icon path by name.""" if _ICON_DIR: path = os.path.join(_ICON_DIR, f"silo-{name}.svg") if os.path.exists(path): return path return "" # --------------------------------------------------------------------------- # FreeCAD-specific path utilities # --------------------------------------------------------------------------- def get_projects_dir() -> Path: """Get the projects directory.""" projects_dir = Path(SILO_PROJECTS_DIR) projects_dir.mkdir(parents=True, exist_ok=True) return projects_dir def get_cad_file_path(part_number: str, description: str = "") -> Path: """Generate canonical file path for a CAD file. Path format: ~/projects/cad/{category_code}/{part_number}_{description}.kc """ category, _ = _parse_part_number(part_number) if description: filename = f"{part_number}_{_sanitize_filename(description)}.kc" else: filename = f"{part_number}.kc" return get_projects_dir() / "cad" / category / filename def find_file_by_part_number(part_number: str) -> Optional[Path]: """Find existing CAD file for a part number. Prefers .kc over .FCStd.""" category, _ = _parse_part_number(part_number) cad_dir = get_projects_dir() / "cad" / category for search_dir in _search_dirs(cad_dir): for ext in ("*.kc", "*.FCStd"): matches = list(search_dir.glob(f"{part_number}{ext[1:]}")) if matches: return matches[0] return None def _search_dirs(category_dir: Path): """Yield the category dir, then all sibling dirs under cad/.""" if category_dir.exists(): yield category_dir base_cad_dir = category_dir.parent if base_cad_dir.exists(): for subdir in base_cad_dir.iterdir(): if subdir.is_dir() and subdir != category_dir: yield subdir def search_local_files(search_term: str = "", category_filter: str = "") -> list: """Search for CAD files in local cad directory.""" results = [] cad_dir = get_projects_dir() / "cad" if not cad_dir.exists(): return results search_lower = search_term.lower() for category_dir in cad_dir.iterdir(): if not category_dir.is_dir(): continue folder_name = category_dir.name category_code = folder_name.split("_")[0] if "_" in folder_name else folder_name if category_filter and category_code.upper() != category_filter.upper(): continue for fcstd_file in sorted( list(category_dir.glob("*.kc")) + list(category_dir.glob("*.FCStd")) ): filename = fcstd_file.stem parts = filename.split("_", 1) part_number = parts[0] description = parts[1].replace("_", " ") if len(parts) > 1 else "" if search_term: searchable = f"{part_number} {description}".lower() if search_lower not in searchable: continue try: from datetime import datetime mtime = fcstd_file.stat().st_mtime modified = datetime.fromtimestamp(mtime).isoformat() except Exception: modified = None results.append( { "path": str(fcstd_file), "part_number": part_number, "description": description, "category": category_code, "modified": modified, "source": "local", } ) results.sort(key=lambda x: x.get("modified") or "", reverse=True) return results def _safe_float(val): """Convert float to JSON-safe value, handling NaN and Infinity.""" import math if isinstance(val, float): if math.isnan(val) or math.isinf(val): return 0.0 return val def collect_document_properties(doc) -> Dict[str, Any]: """Collect properties from all objects in a document.""" result = { "_document_name": doc.Name, "_file_name": doc.FileName or None, "objects": {}, } for obj in doc.Objects: if obj.TypeId.startswith("App::") and obj.TypeId not in ("App::Part",): continue props = {"_object_type": obj.TypeId, "_label": obj.Label} if hasattr(obj, "Placement"): p = obj.Placement props["placement"] = { "position": { "x": _safe_float(p.Base.x), "y": _safe_float(p.Base.y), "z": _safe_float(p.Base.z), }, "rotation": { "axis": { "x": _safe_float(p.Rotation.Axis.x), "y": _safe_float(p.Rotation.Axis.y), "z": _safe_float(p.Rotation.Axis.z), }, "angle": _safe_float(p.Rotation.Angle), }, } if hasattr(obj, "Shape") and obj.Shape: try: bbox = obj.Shape.BoundBox props["bounding_box"] = { "x_length": _safe_float(bbox.XLength), "y_length": _safe_float(bbox.YLength), "z_length": _safe_float(bbox.ZLength), } if hasattr(obj.Shape, "Volume"): props["volume"] = _safe_float(obj.Shape.Volume) except Exception: pass result["objects"][obj.Label] = props return result def set_silo_properties(obj, props: Dict[str, Any]): """Set Silo properties on FreeCAD object.""" for name, value in props.items(): if not hasattr(obj, name): if isinstance(value, str): obj.addProperty("App::PropertyString", name, "Silo", "") elif isinstance(value, int): obj.addProperty("App::PropertyInteger", name, "Silo", "") setattr(obj, name, value) def get_tracked_object(doc): """Find the primary tracked object in a document.""" for obj in doc.Objects: if hasattr(obj, "SiloPartNumber") and obj.SiloPartNumber: return obj return None # --------------------------------------------------------------------------- # Client and sync instances # --------------------------------------------------------------------------- _client = SiloClient(_fc_settings) class SiloSync: """Handles synchronization between FreeCAD and Silo.""" def __init__(self, client: SiloClient = None): self.client = client or _client def save_to_canonical_path(self, doc, force_rename: bool = False) -> Optional[Path]: """Save document to canonical path.""" obj = get_tracked_object(doc) if not obj: return None part_number = obj.SiloPartNumber try: item = self.client.get_item(part_number) description = item.get("description", "") new_path = get_cad_file_path(part_number, description) new_path.parent.mkdir(parents=True, exist_ok=True) existing_path = find_file_by_part_number(part_number) current_path = Path(doc.FileName) if doc.FileName else None # Use save() if already at the correct path, saveAs() only if path changes if current_path and current_path == new_path: doc.save() elif ( existing_path and existing_path != new_path and (force_rename or current_path == existing_path) ): doc.saveAs(str(new_path)) try: existing_path.unlink() except OSError: pass else: doc.saveAs(str(new_path)) return new_path except Exception as e: FreeCAD.Console.PrintError(f"Save failed: {e}\n") return None def create_document_for_item(self, item: Dict[str, Any], save: bool = True): """Create a new FreeCAD document for a database item.""" part_number = item.get("part_number", "") description = item.get("description", "") item_type = item.get("item_type", "part") if not part_number: return None doc = FreeCAD.newDocument(part_number) safe_name = "_" + part_number if item_type == "assembly": # Create an Assembly object for assembly items (FreeCAD 1.0+) try: assembly_obj = doc.addObject("Assembly::AssemblyObject", safe_name) assembly_obj.Label = part_number set_silo_properties( assembly_obj, { "SiloItemId": item.get("id", ""), "SiloPartNumber": part_number, "SiloRevision": item.get("current_revision", 1), "SiloItemType": item_type, }, ) except Exception as e: # Fallback to App::Part if Assembly workbench not available FreeCAD.Console.PrintWarning( f"Assembly workbench not available, using App::Part: {e}\n" ) part_obj = doc.addObject("App::Part", safe_name) part_obj.Label = part_number set_silo_properties( part_obj, { "SiloItemId": item.get("id", ""), "SiloPartNumber": part_number, "SiloRevision": item.get("current_revision", 1), "SiloItemType": item_type, }, ) else: # Create a Part container for non-assembly items part_obj = doc.addObject("App::Part", safe_name) part_obj.Label = part_number set_silo_properties( part_obj, { "SiloItemId": item.get("id", ""), "SiloPartNumber": part_number, "SiloRevision": item.get("current_revision", 1), "SiloItemType": item_type, }, ) # Add a Body for parts (not assemblies) body_label = _sanitize_filename(description) if description else "Body" body = doc.addObject("PartDesign::Body", "_" + body_label) body.Label = body_label part_obj.addObject(body) doc.recompute() if save: file_path = get_cad_file_path(part_number, description) file_path.parent.mkdir(parents=True, exist_ok=True) doc.saveAs(str(file_path)) return doc def create_document_from_template( self, item: Dict[str, Any], template_path: str, save: bool = True ): """Create a new document by copying a template .kc and stamping Silo properties. Falls back to :meth:`create_document_for_item` if *template_path* is missing or invalid. """ import shutil part_number = item.get("part_number", "") description = item.get("description", "") item_type = item.get("item_type", "part") if not part_number or not os.path.isfile(template_path): return self.create_document_for_item(item, save=save) dest_path = get_cad_file_path(part_number, description) dest_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(template_path, str(dest_path)) self._clean_template_zip(str(dest_path)) doc = FreeCAD.openDocument(str(dest_path)) if not doc: return None # Find the root container (first App::Part or Assembly) root_obj = None for obj in doc.Objects: if obj.TypeId in ("App::Part", "Assembly::AssemblyObject"): root_obj = obj break if root_obj is None and doc.Objects: root_obj = doc.Objects[0] if root_obj: root_obj.Label = part_number set_silo_properties( root_obj, { "SiloItemId": item.get("id", ""), "SiloPartNumber": part_number, "SiloRevision": item.get("current_revision", 1), "SiloItemType": item_type, }, ) doc.recompute() if save: doc.save() return doc @staticmethod def _clean_template_zip(filepath: str): """Strip ``silo/template.json`` and ``silo/manifest.json`` from a copied template. The manifest is auto-recreated by ``kc_format.py`` on next save. """ import zipfile tmp_path = filepath + ".tmp" try: with zipfile.ZipFile(filepath, "r") as zf_in: with zipfile.ZipFile(tmp_path, "w") as zf_out: for entry in zf_in.infolist(): if entry.filename in ( "silo/template.json", "silo/manifest.json", ): continue zf_out.writestr(entry, zf_in.read(entry.filename)) os.replace(tmp_path, filepath) except Exception as exc: if os.path.exists(tmp_path): os.unlink(tmp_path) FreeCAD.Console.PrintWarning(f"Failed to clean template ZIP: {exc}\n") def open_item(self, part_number: str): """Open or create item document.""" existing_path = find_file_by_part_number(part_number) if existing_path and existing_path.exists(): return FreeCAD.openDocument(str(existing_path)) try: item = self.client.get_item(part_number) return self.create_document_for_item(item, save=True) except Exception as e: FreeCAD.Console.PrintError(f"Failed to open: {e}\n") return None def upload_file( self, part_number: str, file_path: str, comment: str = "Auto-save" ) -> Optional[Dict]: """Upload file to the server.""" try: doc = FreeCAD.openDocument(file_path) if not doc: return None properties = collect_document_properties(doc) FreeCAD.closeDocument(doc.Name) return self.client._upload_file(part_number, file_path, properties, comment) except Exception as e: FreeCAD.Console.PrintError(f"Upload failed: {e}\n") return None def download_file(self, part_number: str) -> Optional[Path]: """Download the latest revision file from the server.""" try: item = self.client.get_item(part_number) file_path = get_cad_file_path(part_number, item.get("description", "")) file_path.parent.mkdir(parents=True, exist_ok=True) revisions = self.client.get_revisions(part_number) for rev in revisions: if rev.get("file_key"): if self.client._download_file( part_number, rev["revision_number"], str(file_path) ): return file_path return None except Exception as e: FreeCAD.Console.PrintError(f"Download failed: {e}\n") return None _sync = SiloSync() # --------------------------------------------------------------------------- # kindred:// URL handler # --------------------------------------------------------------------------- def handle_kindred_url(url: str): """Handle a ``kindred://`` URL by opening the referenced item. URL format:: kindred://item/{part_number} kindred://item/{part_number}/revision/{rev_number} Called from C++ ``MainWindow::processMessages()`` when a ``kindred://`` URL arrives via IPC, or from ``InitGui.py`` for cold-start URL arguments. """ from urllib.parse import urlparse parsed = urlparse(url) if parsed.scheme != "kindred": return # urlparse treats "kindred://item/PN-001" as netloc="item", path="/PN-001" parts = [parsed.netloc] + [p for p in parsed.path.split("/") if p] if len(parts) >= 2 and parts[0] == "item": part_number = parts[1] FreeCAD.Console.PrintMessage( f"Silo: Opening item {part_number} from kindred:// URL\n" ) _sync.open_item(part_number) # ============================================================================ # COMMANDS # ============================================================================ class Silo_Open: """Open item - combined search and open dialog.""" def GetResources(self): return { "MenuText": "Open", "ToolTip": "Search and open items (Ctrl+O)", "Pixmap": _icon("open"), } def Activated(self): from open_search import OpenItemWidget from PySide import QtGui, QtWidgets mw = FreeCADGui.getMainWindow() mdi = mw.findChild(QtWidgets.QMdiArea) if not mdi: return widget = OpenItemWidget(_client, search_local_files) sw = mdi.addSubWindow(widget) sw.setWindowTitle("Open Item") sw.setWindowIcon(QtGui.QIcon(_icon("open"))) sw.show() mdi.setActiveSubWindow(sw) def _on_selected(data): sw.close() if data.get("path"): FreeCAD.openDocument(data["path"]) else: _sync.open_item(data["part_number"]) widget.item_selected.connect(_on_selected) widget.cancelled.connect(sw.close) def IsActive(self): return True class Silo_New: """Create new item with part number. Opens a pre-document MDI tab containing the schema-driven creation form. Each invocation opens a new tab so multiple items can be prepared in parallel. On successful creation the tab closes and the real document opens in its place. """ def GetResources(self): return { "MenuText": "New", "ToolTip": "Create new item (Ctrl+N)", "Pixmap": _icon("new"), } def Activated(self): from PySide import QtGui, QtWidgets from schema_form import SchemaFormWidget mw = FreeCADGui.getMainWindow() mdi = mw.findChild(QtWidgets.QMdiArea) if not mdi: return # Each invocation creates a new pre-document tab form = SchemaFormWidget(_client) # Pre-fill description from current selection sel = FreeCADGui.Selection.getSelection() if sel: form._desc_edit.setText(sel[0].Label) # Add as MDI subwindow (appears as a tab alongside documents) sw = mdi.addSubWindow(form) sw.setWindowTitle("New Item") sw.setWindowIcon(QtGui.QIcon(_icon("new"))) sw.show() mdi.setActiveSubWindow(sw) # On creation: process result, close tab, open real document def _on_created(result): part_number = result["part_number"] try: sel_now = FreeCADGui.Selection.getSelection() if sel_now: obj = sel_now[0] set_silo_properties( obj, { "SiloItemId": result.get("id", ""), "SiloPartNumber": part_number, "SiloRevision": 1, }, ) obj.Label = part_number _sync.save_to_canonical_path( FreeCAD.ActiveDocument, force_rename=True ) else: template_path = result.get("_form_data", {}).get("template_path") if template_path: _sync.create_document_from_template( result, template_path, save=True ) else: _sync.create_document_for_item(result, save=True) FreeCAD.Console.PrintMessage(f"Created: {part_number}\n") except Exception as e: FreeCAD.Console.PrintError(f"Failed to process created item: {e}\n") # Close the pre-document tab sw.close() form.item_created.connect(_on_created) form.cancelled.connect(sw.close) def IsActive(self): return _server_mode == "normal" def _push_dag_after_upload(doc, part_number, revision_number): """Extract and push the feature DAG after a successful upload. Failures are logged as warnings -- DAG sync must never block save. """ try: from dag import extract_dag nodes, edges = extract_dag(doc) if not nodes: return result = _client.push_dag(part_number, revision_number, nodes, edges) node_count = result.get("node_count", len(nodes)) edge_count = result.get("edge_count", len(edges)) FreeCAD.Console.PrintMessage( f"DAG synced: {node_count} nodes, {edge_count} edges\n" ) except Exception as e: FreeCAD.Console.PrintWarning(f"DAG sync failed: {e}\n") def _update_manifest_revision(file_path, revision_number): """Write revision_hash into the .kc manifest after a successful upload. Failures are logged as warnings -- must never block save. """ try: from kc_format import update_manifest_fields update_manifest_fields( file_path, { "revision_hash": str(revision_number), }, ) except Exception as e: FreeCAD.Console.PrintWarning(f"Manifest revision update failed: {e}\n") def _push_bom_after_upload(doc, part_number, revision_number): """Extract and sync Assembly BOM after a successful upload. Only runs for Assembly documents with cross-document links. Failures are logged as warnings -- BOM sync must never block save. """ try: from bom_sync import sync_bom_after_upload result = sync_bom_after_upload(doc, part_number, _client) if result is None: return # Not an assembly or no cross-doc links parts = [] if result.added_count: parts.append(f"+{result.added_count} added") if result.updated_count: parts.append(f"~{result.updated_count} qty updated") if result.unreferenced_count: parts.append(f"!{result.unreferenced_count} unreferenced") if result.unresolved_count: FreeCAD.Console.PrintWarning( f"BOM sync: {result.unresolved_count} components " f"have no Silo part number\n" ) if parts: FreeCAD.Console.PrintMessage(f"BOM synced: {', '.join(parts)}\n") else: FreeCAD.Console.PrintMessage("BOM synced: no changes\n") for err in result.errors: FreeCAD.Console.PrintWarning(f"BOM sync error: {err}\n") except Exception as e: FreeCAD.Console.PrintWarning(f"BOM sync failed: {e}\n") class Silo_Save: """Save locally and upload to the server.""" def GetResources(self): return { "MenuText": "Save", "ToolTip": "Save locally and upload to server (Ctrl+S)", "Pixmap": _icon("save"), } def Activated(self): doc = FreeCAD.ActiveDocument if not doc: FreeCAD.Console.PrintError("No active document\n") return obj = get_tracked_object(doc) # If not tracked, just do a regular FreeCAD save if not obj: if doc.FileName: doc.save() FreeCAD.Console.PrintMessage(f"Saved: {doc.FileName}\n") else: FreeCADGui.runCommand("Std_SaveAs", 0) return part_number = obj.SiloPartNumber # Check if document has unsaved changes gui_doc = FreeCADGui.getDocument(doc.Name) is_modified = gui_doc.Modified if gui_doc else True if gui_doc and not is_modified and doc.FileName: FreeCAD.Console.PrintMessage("No changes to save.\n") return # Collect properties BEFORE saving to avoid dirtying the document # (accessing Shape properties can trigger recompute) properties = collect_document_properties(doc) # Save locally file_path = _sync.save_to_canonical_path(doc, force_rename=True) if not file_path: # Fallback to regular save if canonical path fails if doc.FileName: doc.save() file_path = Path(doc.FileName) else: FreeCAD.Console.PrintError("Could not determine save path\n") return # Force clear modified flag if save succeeded (needed for assemblies) if gui_doc and gui_doc.Modified: try: gui_doc.Modified = False except Exception: pass FreeCAD.Console.PrintMessage(f"Saved: {file_path}\n") # Try to upload to server try: result = _client._upload_file( part_number, str(file_path), properties, "Auto-save" ) new_rev = result["revision_number"] FreeCAD.Console.PrintMessage(f"Uploaded as revision {new_rev}\n") _push_dag_after_upload(doc, part_number, new_rev) _push_bom_after_upload(doc, part_number, new_rev) _update_manifest_revision(str(file_path), new_rev) except Exception as e: FreeCAD.Console.PrintWarning(f"Upload failed: {e}\n") FreeCAD.Console.PrintMessage("File saved locally but not uploaded.\n") def IsActive(self): return FreeCAD.ActiveDocument is not None and _server_mode == "normal" class Silo_Commit: """Save as new revision with comment.""" def GetResources(self): return { "MenuText": "Commit", "ToolTip": "Save as new revision with comment (Ctrl+Shift+S)", "Pixmap": _icon("commit"), } def Activated(self): from PySide import QtGui doc = FreeCAD.ActiveDocument if not doc: FreeCAD.Console.PrintError("No active document\n") return obj = get_tracked_object(doc) if not obj: FreeCAD.Console.PrintError( "No tracked object. Use 'New' to register first.\n" ) return part_number = obj.SiloPartNumber comment, ok = QtGui.QInputDialog.getText(None, "Commit", "Revision comment:") if not ok: return # Collect properties BEFORE saving to avoid dirtying the document properties = collect_document_properties(doc) try: file_path = _sync.save_to_canonical_path(doc, force_rename=True) if not file_path: return result = _client._upload_file( part_number, str(file_path), properties, comment ) new_rev = result["revision_number"] FreeCAD.Console.PrintMessage(f"Committed revision {new_rev}: {comment}\n") _push_dag_after_upload(doc, part_number, new_rev) _push_bom_after_upload(doc, part_number, new_rev) _update_manifest_revision(str(file_path), new_rev) except Exception as e: FreeCAD.Console.PrintError(f"Commit failed: {e}\n") def IsActive(self): return FreeCAD.ActiveDocument is not None and _server_mode == "normal" def _check_pull_conflicts(part_number, local_path, doc=None): """Check for conflicts between local file and server state. Returns a list of conflict description strings, or an empty list if clean. """ conflicts = [] # Check for unsaved changes in an open document if doc is not None: gui_doc = FreeCADGui.getDocument(doc.Name) if doc.Name else None if gui_doc and gui_doc.Modified: conflicts.append("Document has unsaved local changes.") # Check local revision vs server latest if doc is not None: obj = get_tracked_object(doc) if obj and hasattr(obj, "SiloRevision"): local_rev = getattr(obj, "SiloRevision", 0) latest = _client.latest_file_revision(part_number) if latest and local_rev and latest["revision_number"] > local_rev: conflicts.append( f"Local file is at revision {local_rev}, " f"server has revision {latest['revision_number']}." ) # Check local file mtime vs server timestamp if local_path and local_path.exists(): import datetime local_mtime = datetime.datetime.fromtimestamp( local_path.stat().st_mtime, tz=datetime.timezone.utc ) try: item = _client.get_item(part_number) server_updated = item.get("updated_at", "") if server_updated: # Parse ISO format timestamp server_dt = datetime.datetime.fromisoformat( server_updated.replace("Z", "+00:00") ) if server_dt > local_mtime: conflicts.append("Server version is newer than local file.") except Exception: pass return conflicts class SiloPullDialog: """Dialog for selecting which revision to pull.""" def __init__(self, part_number, revisions, parent=None): from PySide import QtCore, QtGui self._selected_revision = None self._dialog = QtGui.QDialog(parent) self._dialog.setWindowTitle(f"Pull - {part_number}") self._dialog.setMinimumWidth(600) self._dialog.setMinimumHeight(350) layout = QtGui.QVBoxLayout(self._dialog) info = QtGui.QLabel(f"Select a revision to download for {part_number}:") layout.addWidget(info) # Revision table self._table = QtGui.QTableWidget() self._table.setColumnCount(5) self._table.setHorizontalHeaderLabels( ["Rev", "Date", "Comment", "Status", "File"] ) self._table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows) self._table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection) self._table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers) self._table.verticalHeader().setVisible(False) header = self._table.horizontalHeader() header.setStretchLastSection(True) # Populate rows file_revisions = [] self._table.setRowCount(len(revisions)) for i, rev in enumerate(revisions): rev_num = rev.get("revision_number", "") date = rev.get("created_at", "")[:16].replace("T", " ") comment = rev.get("comment", "") status = rev.get("status", "") has_file = "\u2713" if rev.get("file_key") else "" self._table.setItem(i, 0, QtGui.QTableWidgetItem(str(rev_num))) self._table.setItem(i, 1, QtGui.QTableWidgetItem(date)) self._table.setItem(i, 2, QtGui.QTableWidgetItem(comment)) self._table.setItem(i, 3, QtGui.QTableWidgetItem(status)) file_item = QtGui.QTableWidgetItem(has_file) file_item.setTextAlignment(QtCore.Qt.AlignCenter) self._table.setItem(i, 4, file_item) if rev.get("file_key"): file_revisions.append(i) self._table.resizeColumnsToContents() layout.addWidget(self._table) # Pre-select the latest revision with a file if file_revisions: self._table.selectRow(file_revisions[0]) # Store revision data for lookup self._revisions = revisions # Buttons btn_layout = QtGui.QHBoxLayout() btn_layout.addStretch() download_btn = QtGui.QPushButton("Download") cancel_btn = QtGui.QPushButton("Cancel") download_btn.clicked.connect(self._on_download) cancel_btn.clicked.connect(self._dialog.reject) self._table.doubleClicked.connect(self._on_download) btn_layout.addWidget(download_btn) btn_layout.addWidget(cancel_btn) layout.addLayout(btn_layout) def _on_download(self): row = self._table.currentRow() if row < 0: return rev = self._revisions[row] if not rev.get("file_key"): from PySide import QtGui QtGui.QMessageBox.information( self._dialog, "Pull", "Selected revision has no file attached." ) return self._selected_revision = rev self._dialog.accept() def exec_(self): if self._dialog.exec_() == 1: # QDialog.Accepted return self._selected_revision return None def _pull_dependencies(part_number, progress_callback=None): """Recursively pull all BOM children that have files on the server. Returns list of (part_number, dest_path) tuples for successfully pulled files. Skips children that already exist locally. """ pulled = [] try: bom = _client.get_bom(part_number) except Exception as e: FreeCAD.Console.PrintWarning(f"Could not fetch BOM for {part_number}: {e}\n") return pulled for entry in bom: child_pn = entry.get("child_part_number") if not child_pn: continue # Skip if already exists locally existing = find_file_by_part_number(child_pn) if existing and existing.exists(): FreeCAD.Console.PrintMessage( f" {child_pn}: already exists at {existing}\n" ) # Still recurse — this child may itself be an assembly with missing deps _pull_dependencies(child_pn, progress_callback) continue # Check if this child has a file on the server try: latest = _client.latest_file_revision(child_pn) except Exception: latest = None if not latest or not latest.get("file_key"): FreeCAD.Console.PrintMessage(f" {child_pn}: no file on server, skipping\n") continue # Determine destination path child_desc = entry.get("child_description", "") dest_path = get_cad_file_path(child_pn, child_desc) dest_path.parent.mkdir(parents=True, exist_ok=True) rev_num = latest["revision_number"] FreeCAD.Console.PrintMessage(f" Pulling {child_pn} rev {rev_num}...\n") try: ok = _client._download_file( child_pn, rev_num, str(dest_path), progress_callback=progress_callback ) if ok: pulled.append((child_pn, dest_path)) except Exception as e: FreeCAD.Console.PrintWarning(f" Failed to pull {child_pn}: {e}\n") # Recurse into child (it may be a sub-assembly) _pull_dependencies(child_pn, progress_callback) return pulled class Silo_Pull: """Download revision file from the server.""" def GetResources(self): return { "MenuText": "Pull", "ToolTip": "Download file with revision selection", "Pixmap": _icon("pull"), } def Activated(self): from PySide import QtGui doc = FreeCAD.ActiveDocument part_number = None obj = None if doc: obj = get_tracked_object(doc) if obj: part_number = obj.SiloPartNumber if not part_number: part_number, ok = QtGui.QInputDialog.getText(None, "Pull", "Part number:") if not ok or not part_number: return part_number = part_number.strip().upper() # Fetch revisions from server try: revisions = _client.get_revisions(part_number) except Exception as e: QtGui.QMessageBox.warning(None, "Pull", f"Cannot reach server: {e}") return existing_local = find_file_by_part_number(part_number) # If no revisions have files, fall back to create-from-database has_any_file = any(r.get("file_key") for r in revisions) if not has_any_file: if existing_local: FreeCAD.Console.PrintMessage( f"Opening existing local file: {existing_local}\n" ) FreeCAD.openDocument(str(existing_local)) else: try: item = _client.get_item(part_number) new_doc = _sync.create_document_for_item(item, save=True) if new_doc: FreeCAD.Console.PrintMessage( f"Created local file for {part_number}\n" ) else: QtGui.QMessageBox.warning( None, "Pull", f"Failed to create document for {part_number}", ) except Exception as e: QtGui.QMessageBox.warning(None, "Pull", f"Failed: {e}") return # Conflict detection conflicts = _check_pull_conflicts(part_number, existing_local, doc) if conflicts: detail = "\n".join(f" - {c}" for c in conflicts) reply = QtGui.QMessageBox.warning( None, "Pull - Conflicts Detected", f"Potential conflicts found:\n{detail}\n\n" "Download anyway and overwrite local file?", QtGui.QMessageBox.Yes | QtGui.QMessageBox.No, ) if reply != QtGui.QMessageBox.Yes: return # Show revision selection dialog dlg = SiloPullDialog(part_number, revisions) selected = dlg.exec_() if selected is None: return rev_num = selected["revision_number"] # Determine destination path try: item = _client.get_item(part_number) except Exception: item = {} dest_path = get_cad_file_path(part_number, item.get("description", "")) dest_path.parent.mkdir(parents=True, exist_ok=True) # Download with progress progress = QtGui.QProgressDialog( f"Downloading {part_number} rev {rev_num}...", "Cancel", 0, 100 ) progress.setWindowModality(QtCore.Qt.WindowModal) progress.setMinimumDuration(0) progress.setValue(0) def on_progress(downloaded, total): if progress.wasCanceled(): return if total > 0: pct = int(downloaded * 100 / total) progress.setValue(min(pct, 99)) else: # Indeterminate - pulse between 0-90 progress.setValue(min(int(downloaded / 1024) % 90, 89)) QtGui.QApplication.processEvents() try: ok = _client._download_file( part_number, rev_num, str(dest_path), progress_callback=on_progress ) except Exception as e: progress.close() QtGui.QMessageBox.warning(None, "Pull", f"Download failed: {e}") return progress.setValue(100) progress.close() if not ok: QtGui.QMessageBox.warning(None, "Pull", "Download failed.") return FreeCAD.Console.PrintMessage(f"Pulled revision {rev_num} of {part_number}\n") # Pull assembly dependencies before opening so links resolve if item.get("item_type") == "assembly": progress.setLabelText(f"Pulling dependencies for {part_number}...") progress.setValue(0) progress.show() dep_pulled = _pull_dependencies(part_number, progress_callback=on_progress) progress.setValue(100) progress.close() if dep_pulled: FreeCAD.Console.PrintMessage( f"Pulled {len(dep_pulled)} dependency file(s)\n" ) # Close existing document if open, then reopen if doc and doc.FileName == str(dest_path): FreeCAD.closeDocument(doc.Name) FreeCAD.openDocument(str(dest_path)) # Update SiloRevision property on the tracked object new_doc = FreeCAD.ActiveDocument if new_doc: new_obj = get_tracked_object(new_doc) if new_obj and hasattr(new_obj, "SiloRevision"): new_obj.SiloRevision = rev_num new_doc.save() def IsActive(self): return True class Silo_Push: """Upload local files to the server.""" def GetResources(self): return { "MenuText": "Push", "ToolTip": "Upload local files that aren't on the server", "Pixmap": _icon("push"), } def Activated(self): from datetime import datetime, timezone from PySide import QtGui # Find files that need uploading (no server file, or local is newer) local_files = search_local_files() unuploaded = [] for lf in local_files: pn = lf["part_number"] try: _client.get_item(pn) # Check if in DB server_rev = _client.latest_file_revision(pn) if not server_rev: # No file on server at all unuploaded.append(lf) else: # Compare local mtime against server revision timestamp try: local_mtime = os.path.getmtime(lf["path"]) server_time_str = server_rev.get("created_at", "") if server_time_str: server_dt = datetime.fromisoformat( server_time_str.replace("Z", "+00:00") ) local_dt = datetime.fromtimestamp( local_mtime, tz=timezone.utc ) if local_dt > server_dt: unuploaded.append(lf) else: # Can't parse server time, assume needs upload unuploaded.append(lf) except Exception: # On any comparison error, include it unuploaded.append(lf) except Exception: pass # Not in DB, skip if not unuploaded: QtGui.QMessageBox.information( None, "Push", "All local files are already uploaded." ) return msg = f"Found {len(unuploaded)} files to upload:\n\n" for item in unuploaded[:10]: msg += f" {item['part_number']}\n" if len(unuploaded) > 10: msg += f" ... and {len(unuploaded) - 10} more\n" msg += "\nUpload?" reply = QtGui.QMessageBox.question( None, "Push", msg, QtGui.QMessageBox.Yes | QtGui.QMessageBox.No ) if reply != QtGui.QMessageBox.Yes: return uploaded = 0 for item in unuploaded: result = _sync.upload_file( item["part_number"], item["path"], "Synced from local" ) if result: uploaded += 1 QtGui.QMessageBox.information(None, "Push", f"Uploaded {uploaded} files.") def IsActive(self): return _server_mode == "normal" class Silo_Info: """Show item status and revision history.""" def GetResources(self): return { "MenuText": "Info", "ToolTip": "Show item status and revision history", "Pixmap": _icon("info"), } def Activated(self): from PySide import QtGui doc = FreeCAD.ActiveDocument if not doc: FreeCAD.Console.PrintError("No active document\n") return obj = get_tracked_object(doc) if not obj: FreeCAD.Console.PrintError("No tracked object\n") return part_number = obj.SiloPartNumber try: item = _client.get_item(part_number) revisions = _client.get_revisions(part_number) # Get projects for item try: projects = _client.get_item_projects(part_number) project_codes = [p.get("code", "") for p in projects if p.get("code")] except Exception: project_codes = [] msg = f"

{part_number}

" msg += f"

Type: {item.get('item_type', '-')}

" msg += f"

Description: {item.get('description', '-')}

" msg += f"

Projects: {', '.join(project_codes) if project_codes else 'None'}

" msg += f"

Current Revision: {item.get('current_revision', 1)}

" msg += f"

Local Revision: {getattr(obj, 'SiloRevision', '-')}

" has_file, _ = _client.has_file(part_number) msg += f"

File on Server: {'Yes' if has_file else 'No'}

" # Show current revision status if revisions: current_status = revisions[0].get("status", "draft") current_labels = revisions[0].get("labels", []) msg += f"

Current Status: {current_status}

" if current_labels: msg += f"

Labels: {', '.join(current_labels)}

" msg += "

Revision History

" msg += "" for rev in revisions: file_icon = "✓" if rev.get("file_key") else "-" comment = rev.get("comment", "") or "-" date = rev.get("created_at", "")[:10] status = rev.get("status", "draft") msg += f"" msg += "
RevStatusDateFileComment
{rev['revision_number']}{status}{date}{file_icon}{comment}
" dialog = QtGui.QMessageBox() dialog.setWindowTitle("Item Info") dialog.setTextFormat(QtGui.Qt.RichText) dialog.setText(msg) dialog.exec_() except Exception as e: QtGui.QMessageBox.warning(None, "Info", f"Failed to get info: {e}") def IsActive(self): return FreeCAD.ActiveDocument is not None class Silo_TagProjects: """Manage project tags for an item.""" def GetResources(self): return { "MenuText": "Tag Projects", "ToolTip": "Add or remove project tags for an item", "Pixmap": _icon("tag"), } def Activated(self): from PySide import QtGui doc = FreeCAD.ActiveDocument if not doc: FreeCAD.Console.PrintError("No active document\n") return obj = get_tracked_object(doc) if not obj: FreeCAD.Console.PrintError("No tracked object\n") return part_number = obj.SiloPartNumber try: # Get current projects for item current_projects = _client.get_item_projects(part_number) current_codes = { p.get("code", "") for p in current_projects if p.get("code") } # Get all available projects all_projects = _client.get_projects() all_codes = [p.get("code", "") for p in all_projects if p.get("code")] if not all_codes: QtGui.QMessageBox.information( None, "Tag Projects", "No projects available. Create projects first.", ) return # Multi-select dialog dialog = QtGui.QDialog() dialog.setWindowTitle(f"Tag Projects for {part_number}") dialog.setMinimumWidth(350) layout = QtGui.QVBoxLayout(dialog) label = QtGui.QLabel("Select projects to associate with this item:") layout.addWidget(label) list_widget = QtGui.QListWidget() list_widget.setSelectionMode(QtGui.QAbstractItemView.MultiSelection) for code in all_codes: item = QtGui.QListWidgetItem(code) list_widget.addItem(item) if code in current_codes: item.setSelected(True) layout.addWidget(list_widget) btn_layout = QtGui.QHBoxLayout() cancel_btn = QtGui.QPushButton("Cancel") save_btn = QtGui.QPushButton("Save") btn_layout.addWidget(cancel_btn) btn_layout.addWidget(save_btn) layout.addLayout(btn_layout) cancel_btn.clicked.connect(dialog.reject) save_btn.clicked.connect(dialog.accept) if dialog.exec_() == QtGui.QDialog.Accepted: selected = [item.text() for item in list_widget.selectedItems()] # Add new tags to_add = [c for c in selected if c not in current_codes] if to_add: _client.add_item_projects(part_number, to_add) # Note: removing tags would require a separate API call per project # For simplicity, we just add new ones here msg = f"Updated project tags for {part_number}" if to_add: msg += f"\nAdded: {', '.join(to_add)}" QtGui.QMessageBox.information(None, "Tag Projects", msg) FreeCAD.Console.PrintMessage(f"{msg}\n") except Exception as e: QtGui.QMessageBox.warning(None, "Tag Projects", f"Failed: {e}") def IsActive(self): return FreeCAD.ActiveDocument is not None class Silo_Rollback: """Rollback to a previous revision.""" def GetResources(self): return { "MenuText": "Rollback", "ToolTip": "Rollback to a previous revision", "Pixmap": _icon("rollback"), } def Activated(self): from PySide import QtCore, QtGui doc = FreeCAD.ActiveDocument if not doc: FreeCAD.Console.PrintError("No active document\n") return obj = get_tracked_object(doc) if not obj: FreeCAD.Console.PrintError("No tracked object\n") return part_number = obj.SiloPartNumber try: revisions = _client.get_revisions(part_number) if len(revisions) < 2: QtGui.QMessageBox.information( None, "Rollback", "No previous revisions to rollback to." ) return # Build revision list for selection (exclude current/latest) current_rev = revisions[0]["revision_number"] prev_revisions = revisions[1:] # All except latest # Create selection dialog dialog = QtGui.QDialog() dialog.setWindowTitle(f"Rollback {part_number}") dialog.setMinimumWidth(500) dialog.setMinimumHeight(300) layout = QtGui.QVBoxLayout(dialog) label = QtGui.QLabel( f"Select a revision to rollback to (current: Rev {current_rev}):" ) layout.addWidget(label) # Revision table table = QtGui.QTableWidget() table.setColumnCount(4) table.setHorizontalHeaderLabels(["Rev", "Status", "Date", "Comment"]) table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows) table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection) table.setRowCount(len(prev_revisions)) table.horizontalHeader().setStretchLastSection(True) for i, rev in enumerate(prev_revisions): table.setItem(i, 0, QtGui.QTableWidgetItem(str(rev["revision_number"]))) table.setItem(i, 1, QtGui.QTableWidgetItem(rev.get("status", "draft"))) table.setItem( i, 2, QtGui.QTableWidgetItem(rev.get("created_at", "")[:10]) ) table.setItem( i, 3, QtGui.QTableWidgetItem(rev.get("comment", "") or "") ) table.resizeColumnsToContents() layout.addWidget(table) # Comment field comment_label = QtGui.QLabel("Rollback comment (optional):") layout.addWidget(comment_label) comment_input = QtGui.QLineEdit() comment_input.setPlaceholderText("Reason for rollback...") layout.addWidget(comment_input) # Buttons btn_layout = QtGui.QHBoxLayout() cancel_btn = QtGui.QPushButton("Cancel") rollback_btn = QtGui.QPushButton("Rollback") btn_layout.addStretch() btn_layout.addWidget(cancel_btn) btn_layout.addWidget(rollback_btn) layout.addLayout(btn_layout) selected_rev = [None] def on_rollback(): selected = table.selectedItems() if not selected: QtGui.QMessageBox.warning( dialog, "Rollback", "Please select a revision" ) return selected_rev[0] = int(table.item(selected[0].row(), 0).text()) dialog.accept() cancel_btn.clicked.connect(dialog.reject) rollback_btn.clicked.connect(on_rollback) if dialog.exec_() == QtGui.QDialog.Accepted and selected_rev[0]: target_rev = selected_rev[0] comment = comment_input.text().strip() # Confirm reply = QtGui.QMessageBox.question( None, "Confirm Rollback", f"Create new revision by rolling back to Rev {target_rev}?\n\n" "This will copy properties and file reference from the selected revision.", QtGui.QMessageBox.Yes | QtGui.QMessageBox.No, ) if reply != QtGui.QMessageBox.Yes: return # Perform rollback result = _client.rollback_revision(part_number, target_rev, comment) new_rev = result["revision_number"] FreeCAD.Console.PrintMessage( f"Created revision {new_rev} (rollback from {target_rev})\n" ) QtGui.QMessageBox.information( None, "Rollback Complete", f"Created revision {new_rev} from rollback to Rev {target_rev}.\n\n" "Use 'Pull' to download the rolled-back file.", ) except Exception as e: QtGui.QMessageBox.warning(None, "Rollback", f"Failed: {e}") def IsActive(self): return FreeCAD.ActiveDocument is not None class Silo_SetStatus: """Set revision status (draft, review, released, obsolete).""" def GetResources(self): return { "MenuText": "Set Status", "ToolTip": "Set the status of the current revision", "Pixmap": _icon("status"), } def Activated(self): from PySide import QtGui doc = FreeCAD.ActiveDocument if not doc: FreeCAD.Console.PrintError("No active document\n") return obj = get_tracked_object(doc) if not obj: FreeCAD.Console.PrintError("No tracked object\n") return part_number = obj.SiloPartNumber local_rev = getattr(obj, "SiloRevision", 1) try: # Get current revision info revisions = _client.get_revisions(part_number) current_rev = revisions[0] if revisions else None if not current_rev: QtGui.QMessageBox.warning(None, "Set Status", "No revisions found") return current_status = current_rev.get("status", "draft") rev_num = current_rev["revision_number"] # Status selection statuses = ["draft", "review", "released", "obsolete"] status, ok = QtGui.QInputDialog.getItem( None, "Set Revision Status", f"Set status for Rev {rev_num} (current: {current_status}):", statuses, statuses.index(current_status), False, ) if not ok or status == current_status: return # Update status _client.update_revision(part_number, rev_num, status=status) FreeCAD.Console.PrintMessage( f"Updated Rev {rev_num} status to '{status}'\n" ) QtGui.QMessageBox.information( None, "Status Updated", f"Revision {rev_num} status set to '{status}'" ) except Exception as e: QtGui.QMessageBox.warning(None, "Set Status", f"Failed: {e}") def IsActive(self): return FreeCAD.ActiveDocument is not None class Silo_Settings: """Configure Silo connection settings.""" def GetResources(self): return { "MenuText": "Settings", "ToolTip": "Configure Silo API URL and SSL settings", "Pixmap": _icon("info"), } def Activated(self): from PySide import QtCore, QtGui param = FreeCAD.ParamGet(_PREF_GROUP) dialog = QtGui.QDialog() dialog.setWindowTitle("Silo Settings") dialog.setMinimumWidth(450) layout = QtGui.QVBoxLayout(dialog) # URL url_label = QtGui.QLabel("Silo API URL:") layout.addWidget(url_label) url_input = QtGui.QLineEdit() url_input.setPlaceholderText("http://localhost:8080/api") current_url = param.GetString("ApiUrl", "") if current_url: url_input.setText(current_url) else: env_url = os.environ.get("SILO_API_URL", "") if env_url: url_input.setText(env_url) layout.addWidget(url_input) url_hint = QtGui.QLabel( "Full URL with path (e.g. http://localhost:8080/api) or just the " "hostname (e.g. https://silo.kindred.internal) and /api is " "appended automatically. Leave empty for SILO_API_URL env var." ) url_hint.setWordWrap(True) url_hint.setStyleSheet("color: #888; font-size: 11px;") layout.addWidget(url_hint) layout.addSpacing(10) # Schema name schema_label = QtGui.QLabel("Schema Name:") layout.addWidget(schema_label) schema_input = QtGui.QLineEdit() schema_input.setPlaceholderText("kindred-rd") current_schema = param.GetString("SchemaName", "") if current_schema: schema_input.setText(current_schema) else: env_schema = os.environ.get("SILO_SCHEMA", "") if env_schema: schema_input.setText(env_schema) layout.addWidget(schema_input) schema_hint = QtGui.QLabel( "The part-numbering schema to use. Leave empty for " "SILO_SCHEMA env var or default (kindred-rd)." ) schema_hint.setWordWrap(True) schema_hint.setStyleSheet("color: #888; font-size: 11px;") layout.addWidget(schema_hint) layout.addSpacing(10) # SSL ssl_checkbox = QtGui.QCheckBox("Verify SSL certificates") ssl_checkbox.setChecked(param.GetBool("SslVerify", True)) layout.addWidget(ssl_checkbox) ssl_hint = QtGui.QLabel( "Disable only for internal servers with self-signed certificates." ) ssl_hint.setWordWrap(True) ssl_hint.setStyleSheet("color: #888; font-size: 11px;") layout.addWidget(ssl_hint) layout.addSpacing(5) # Custom CA certificate cert_label = QtGui.QLabel("Custom CA certificate file:") layout.addWidget(cert_label) cert_row = QtGui.QHBoxLayout() cert_input = QtGui.QLineEdit() cert_input.setPlaceholderText("(Use system CA certificates)") current_cert = param.GetString("SslCertPath", "") if current_cert: cert_input.setText(current_cert) cert_browse = QtGui.QPushButton("Browse...") cert_row.addWidget(cert_input) cert_row.addWidget(cert_browse) layout.addLayout(cert_row) cert_hint = QtGui.QLabel( "Path to a PEM/CRT file for internal CAs. Leave empty for system certificates only." ) cert_hint.setWordWrap(True) cert_hint.setStyleSheet("color: #888; font-size: 11px;") layout.addWidget(cert_hint) def on_browse_cert(): path, _ = QtGui.QFileDialog.getOpenFileName( dialog, "Select CA Certificate", os.path.dirname(cert_input.text()) or os.path.expanduser("~"), "Certificates (*.pem *.crt *.cer);;All Files (*)", ) if path: cert_input.setText(path) cert_browse.clicked.connect(on_browse_cert) layout.addSpacing(10) # Authentication section auth_heading = QtGui.QLabel("Authentication") auth_heading.setTextFormat(QtCore.Qt.RichText) layout.addWidget(auth_heading) auth_user = _get_auth_username() auth_role = _get_auth_role() auth_source = _get_auth_source() has_token = bool(_get_auth_token()) if has_token and auth_user: auth_parts = [f"Logged in as {auth_user}"] if auth_role: auth_parts.append(f"(role: {auth_role})") if auth_source: auth_parts.append(f"via {auth_source}") auth_status_text = " ".join(auth_parts) else: auth_status_text = "Not logged in" auth_status_lbl = QtGui.QLabel(auth_status_text) auth_status_lbl.setTextFormat(QtCore.Qt.RichText) layout.addWidget(auth_status_lbl) # API token input token_label = QtGui.QLabel("API Token:") layout.addWidget(token_label) token_row = QtGui.QHBoxLayout() token_input = QtGui.QLineEdit() token_input.setEchoMode(QtGui.QLineEdit.Password) token_input.setPlaceholderText("silo_... (paste token or use Login)") current_token = param.GetString("ApiToken", "") if current_token: token_input.setText(current_token) token_row.addWidget(token_input) token_show_btn = QtGui.QToolButton() token_show_btn.setText("\U0001f441") token_show_btn.setCheckable(True) token_show_btn.setFixedSize(28, 28) token_show_btn.setToolTip("Show/hide token") def on_toggle_show(checked): if checked: token_input.setEchoMode(QtGui.QLineEdit.Normal) else: token_input.setEchoMode(QtGui.QLineEdit.Password) token_show_btn.toggled.connect(on_toggle_show) token_row.addWidget(token_show_btn) layout.addLayout(token_row) token_hint = QtGui.QLabel( "Paste an API token generated from the Silo web UI, " "or use Login in the Database Auth panel to create one " "automatically. Tokens can also be set via the " "SILO_API_TOKEN environment variable." ) token_hint.setWordWrap(True) token_hint.setStyleSheet("color: #888; font-size: 11px;") layout.addWidget(token_hint) layout.addSpacing(4) clear_auth_btn = QtGui.QPushButton("Clear Token and Logout") clear_auth_btn.setEnabled(has_token) def on_clear_auth(): _clear_auth() token_input.setText("") auth_status_lbl.setText("Not logged in") clear_auth_btn.setEnabled(False) FreeCAD.Console.PrintMessage("Silo: API token and credentials cleared\n") clear_auth_btn.clicked.connect(on_clear_auth) layout.addWidget(clear_auth_btn) layout.addSpacing(10) # Current effective values (read-only) cert_display = param.GetString("SslCertPath", "") or "(system defaults)" if has_token and auth_user: auth_display = f"{auth_user} ({auth_role or 'unknown role'})" if auth_source: auth_display += f" via {auth_source}" elif has_token: auth_display = "token configured (user unknown)" else: auth_display = "not configured" status_label = QtGui.QLabel( f"Active URL: {_get_api_url()}
" f"Schema: {_get_schema_name()}
" f"SSL verification: {'enabled' if _get_ssl_verify() else 'disabled'}
" f"CA certificate: {cert_display}
" f"Authentication: {auth_display}" ) status_label.setTextFormat(QtCore.Qt.RichText) layout.addWidget(status_label) layout.addStretch() # Buttons btn_layout = QtGui.QHBoxLayout() save_btn = QtGui.QPushButton("Save") cancel_btn = QtGui.QPushButton("Cancel") btn_layout.addStretch() btn_layout.addWidget(save_btn) btn_layout.addWidget(cancel_btn) layout.addLayout(btn_layout) def on_save(): url = url_input.text().strip() param.SetString("ApiUrl", url) param.SetString("SchemaName", schema_input.text().strip()) param.SetBool("SslVerify", ssl_checkbox.isChecked()) cert_path = cert_input.text().strip() param.SetString("SslCertPath", cert_path) # Save API token if changed new_token = token_input.text().strip() old_token = param.GetString("ApiToken", "") if new_token != old_token: param.SetString("ApiToken", new_token) if new_token and not old_token: FreeCAD.Console.PrintMessage("Silo: API token configured\n") elif not new_token and old_token: _clear_auth() FreeCAD.Console.PrintMessage("Silo: API token removed\n") else: FreeCAD.Console.PrintMessage("Silo: API token updated\n") FreeCAD.Console.PrintMessage( f"Silo settings saved. URL: {_get_api_url()}, " f"SSL verify: {_get_ssl_verify()}, " f"Cert: {cert_path or '(system)'}\n" ) dialog.accept() save_btn.clicked.connect(on_save) cancel_btn.clicked.connect(dialog.reject) dialog.exec_() def IsActive(self): return True class Silo_BOM: """View and manage Bill of Materials for the current item.""" def GetResources(self): return { "MenuText": "BOM", "ToolTip": "View and manage Bill of Materials", "Pixmap": _icon("bom"), } def Activated(self): from PySide import QtCore, QtGui doc = FreeCAD.ActiveDocument if not doc: FreeCAD.Console.PrintError("No active document\n") return obj = get_tracked_object(doc) if not obj: reply = QtGui.QMessageBox.question( None, "BOM", "This document is not registered with Silo.\n\nRegister it now?", QtGui.QMessageBox.Yes | QtGui.QMessageBox.No, ) if reply != QtGui.QMessageBox.Yes: return FreeCADGui.runCommand("Silo_New") obj = get_tracked_object(doc) if not obj: return part_number = obj.SiloPartNumber try: item = _client.get_item(part_number) except Exception as e: QtGui.QMessageBox.warning(None, "BOM", f"Failed to get item info:\n{e}") return # Build the dialog dialog = QtGui.QDialog() dialog.setWindowTitle(f"BOM - {part_number}") dialog.setMinimumWidth(750) dialog.setMinimumHeight(450) layout = QtGui.QVBoxLayout(dialog) # Item header header = QtGui.QLabel(f"{part_number} - {item.get('description', '')}") layout.addWidget(header) # Tab widget tabs = QtGui.QTabWidget() layout.addWidget(tabs) # ── Tab 1: BOM (children of this item) ── bom_widget = QtGui.QWidget() bom_layout = QtGui.QVBoxLayout(bom_widget) bom_table = QtGui.QTableWidget() bom_table.setColumnCount(7) bom_table.setHorizontalHeaderLabels( ["Part Number", "Description", "Type", "Qty", "Unit", "Ref Des", "Rev"] ) bom_table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows) bom_table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection) bom_table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers) bom_table.horizontalHeader().setStretchLastSection(True) bom_layout.addWidget(bom_table) # BOM button bar bom_btn_layout = QtGui.QHBoxLayout() add_btn = QtGui.QPushButton("Add") edit_btn = QtGui.QPushButton("Edit") remove_btn = QtGui.QPushButton("Remove") bom_btn_layout.addWidget(add_btn) bom_btn_layout.addWidget(edit_btn) bom_btn_layout.addWidget(remove_btn) bom_btn_layout.addStretch() bom_layout.addLayout(bom_btn_layout) tabs.addTab(bom_widget, "BOM") # ── Tab 2: Where Used (parents of this item) ── wu_widget = QtGui.QWidget() wu_layout = QtGui.QVBoxLayout(wu_widget) wu_table = QtGui.QTableWidget() wu_table.setColumnCount(5) wu_table.setHorizontalHeaderLabels( ["Parent Part Number", "Type", "Qty", "Unit", "Ref Des"] ) wu_table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows) wu_table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection) wu_table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers) wu_table.horizontalHeader().setStretchLastSection(True) wu_layout.addWidget(wu_table) tabs.addTab(wu_widget, "Where Used") # ── Data loading ── bom_data = [] def load_bom(): nonlocal bom_data try: bom_data = _client.get_bom(part_number) except Exception as exc: FreeCAD.Console.PrintWarning(f"BOM load error: {exc}\n") bom_data = [] bom_table.setRowCount(len(bom_data)) for row, entry in enumerate(bom_data): bom_table.setItem( row, 0, QtGui.QTableWidgetItem(entry.get("child_part_number", "")) ) bom_table.setItem( row, 1, QtGui.QTableWidgetItem(entry.get("child_description", "")) ) bom_table.setItem( row, 2, QtGui.QTableWidgetItem(entry.get("rel_type", "")) ) qty = entry.get("quantity") bom_table.setItem( row, 3, QtGui.QTableWidgetItem(str(qty) if qty is not None else "") ) bom_table.setItem( row, 4, QtGui.QTableWidgetItem(entry.get("unit") or "") ) ref_des = entry.get("reference_designators") or [] bom_table.setItem(row, 5, QtGui.QTableWidgetItem(", ".join(ref_des))) bom_table.setItem( row, 6, QtGui.QTableWidgetItem(str(entry.get("effective_revision", ""))), ) bom_table.resizeColumnsToContents() def load_where_used(): try: wu_data = _client.get_bom_where_used(part_number) except Exception as exc: FreeCAD.Console.PrintWarning(f"Where-used load error: {exc}\n") wu_data = [] wu_table.setRowCount(len(wu_data)) for row, entry in enumerate(wu_data): wu_table.setItem( row, 0, QtGui.QTableWidgetItem(entry.get("parent_part_number", "")) ) wu_table.setItem( row, 1, QtGui.QTableWidgetItem(entry.get("rel_type", "")) ) qty = entry.get("quantity") wu_table.setItem( row, 2, QtGui.QTableWidgetItem(str(qty) if qty is not None else "") ) wu_table.setItem( row, 3, QtGui.QTableWidgetItem(entry.get("unit") or "") ) ref_des = entry.get("reference_designators") or [] wu_table.setItem(row, 4, QtGui.QTableWidgetItem(", ".join(ref_des))) wu_table.resizeColumnsToContents() # ── Button handlers ── def on_add(): add_dlg = QtGui.QDialog(dialog) add_dlg.setWindowTitle("Add BOM Entry") add_dlg.setMinimumWidth(400) al = QtGui.QFormLayout(add_dlg) child_input = QtGui.QLineEdit() child_input.setPlaceholderText("e.g. F01-0001") al.addRow("Child Part Number:", child_input) type_combo = QtGui.QComboBox() type_combo.addItems(["component", "alternate", "reference"]) al.addRow("Relationship Type:", type_combo) qty_input = QtGui.QLineEdit() qty_input.setPlaceholderText("e.g. 4") al.addRow("Quantity:", qty_input) unit_input = QtGui.QLineEdit() unit_input.setPlaceholderText("e.g. ea, m, kg") al.addRow("Unit:", unit_input) refdes_input = QtGui.QLineEdit() refdes_input.setPlaceholderText("e.g. R1, R2, R3") al.addRow("Ref Designators:", refdes_input) btn_box = QtGui.QDialogButtonBox( QtGui.QDialogButtonBox.Ok | QtGui.QDialogButtonBox.Cancel ) btn_box.accepted.connect(add_dlg.accept) btn_box.rejected.connect(add_dlg.reject) al.addRow(btn_box) if add_dlg.exec_() != QtGui.QDialog.Accepted: return child_pn = child_input.text().strip() if not child_pn: return qty = None qty_text = qty_input.text().strip() if qty_text: try: qty = float(qty_text) except ValueError: QtGui.QMessageBox.warning( dialog, "BOM", "Quantity must be a number." ) return unit = unit_input.text().strip() or None rel_type = type_combo.currentText() ref_des = None refdes_text = refdes_input.text().strip() if refdes_text: ref_des = [r.strip() for r in refdes_text.split(",") if r.strip()] try: _client.add_bom_entry( part_number, child_pn, quantity=qty, unit=unit, rel_type=rel_type, ref_des=ref_des, ) load_bom() except Exception as exc: QtGui.QMessageBox.warning(dialog, "BOM", f"Failed to add entry:\n{exc}") def on_edit(): selected = bom_table.selectedItems() if not selected: return row = selected[0].row() if row < 0 or row >= len(bom_data): return entry = bom_data[row] child_pn = entry.get("child_part_number", "") edit_dlg = QtGui.QDialog(dialog) edit_dlg.setWindowTitle(f"Edit BOM Entry - {child_pn}") edit_dlg.setMinimumWidth(400) el = QtGui.QFormLayout(edit_dlg) type_combo = QtGui.QComboBox() type_combo.addItems(["component", "alternate", "reference"]) current_type = entry.get("rel_type", "component") idx = type_combo.findText(current_type) if idx >= 0: type_combo.setCurrentIndex(idx) el.addRow("Relationship Type:", type_combo) qty_input = QtGui.QLineEdit() qty = entry.get("quantity") if qty is not None: qty_input.setText(str(qty)) el.addRow("Quantity:", qty_input) unit_input = QtGui.QLineEdit() unit_input.setText(entry.get("unit") or "") el.addRow("Unit:", unit_input) refdes_input = QtGui.QLineEdit() ref_des = entry.get("reference_designators") or [] refdes_input.setText(", ".join(ref_des)) el.addRow("Ref Designators:", refdes_input) btn_box = QtGui.QDialogButtonBox( QtGui.QDialogButtonBox.Ok | QtGui.QDialogButtonBox.Cancel ) btn_box.accepted.connect(edit_dlg.accept) btn_box.rejected.connect(edit_dlg.reject) el.addRow(btn_box) if edit_dlg.exec_() != QtGui.QDialog.Accepted: return new_qty = None qty_text = qty_input.text().strip() if qty_text: try: new_qty = float(qty_text) except ValueError: QtGui.QMessageBox.warning( dialog, "BOM", "Quantity must be a number." ) return new_unit = unit_input.text().strip() or None new_type = type_combo.currentText() new_ref_des = None refdes_text = refdes_input.text().strip() if refdes_text: new_ref_des = [r.strip() for r in refdes_text.split(",") if r.strip()] else: new_ref_des = [] try: _client.update_bom_entry( part_number, child_pn, quantity=new_qty, unit=new_unit, rel_type=new_type, ref_des=new_ref_des, ) load_bom() except Exception as exc: QtGui.QMessageBox.warning( dialog, "BOM", f"Failed to update entry:\n{exc}" ) def on_remove(): selected = bom_table.selectedItems() if not selected: return row = selected[0].row() if row < 0 or row >= len(bom_data): return entry = bom_data[row] child_pn = entry.get("child_part_number", "") reply = QtGui.QMessageBox.question( dialog, "Remove BOM Entry", f"Remove {child_pn} from BOM?", QtGui.QMessageBox.Yes | QtGui.QMessageBox.No, ) if reply != QtGui.QMessageBox.Yes: return try: _client.delete_bom_entry(part_number, child_pn) load_bom() except Exception as exc: QtGui.QMessageBox.warning( dialog, "BOM", f"Failed to remove entry:\n{exc}" ) add_btn.clicked.connect(on_add) edit_btn.clicked.connect(on_edit) remove_btn.clicked.connect(on_remove) # Close button close_layout = QtGui.QHBoxLayout() close_layout.addStretch() close_btn = QtGui.QPushButton("Close") close_btn.clicked.connect(dialog.accept) close_layout.addWidget(close_btn) layout.addLayout(close_layout) # Initial data load load_bom() load_where_used() dialog.exec_() def IsActive(self): return FreeCAD.ActiveDocument is not None # --------------------------------------------------------------------------- # SSE live-update listener # --------------------------------------------------------------------------- class SiloEventListener(QtCore.QThread): """Background thread that listens to Server-Sent Events from the Silo API. Emits Qt signals when items are updated or new revisions are created. Degrades gracefully if the server does not support the ``/api/events`` endpoint. """ item_updated = QtCore.Signal(str) # part_number revision_created = QtCore.Signal(str, int) # part_number, revision connection_status = QtCore.Signal( str, int, str ) # (status, retry_count, error_message) server_mode_changed = QtCore.Signal(str) # "normal" / "read-only" / "offline" # DAG events dag_updated = QtCore.Signal(str, int, int) # part_number, node_count, edge_count dag_validated = QtCore.Signal(str, bool, int) # part_number, valid, failed_count # BOM events bom_merged = QtCore.Signal(str, int, int, int) # pn, added, updated, unreferenced # Job lifecycle events job_created = QtCore.Signal(str, str, str) # job_id, definition_name, part_number job_claimed = QtCore.Signal(str, str) # job_id, runner_id job_progress = QtCore.Signal(str, int, str) # job_id, progress, message job_completed = QtCore.Signal(str) # job_id job_failed = QtCore.Signal(str, str) # job_id, error job_cancelled = QtCore.Signal(str) # job_id _MAX_RETRIES = 10 _BASE_DELAY = 1 # seconds, doubles each retry _MAX_DELAY = 60 # seconds, backoff cap def __init__(self, parent=None): super().__init__(parent) self._stop_flag = False self._response = None # -- public API --------------------------------------------------------- def stop(self): self._stop_flag = True # Close the socket so readline() unblocks immediately try: if self._response is not None: self._response.close() except Exception: pass self.wait(5000) # -- thread entry ------------------------------------------------------- def run(self): import time retries = 0 last_error = "" while not self._stop_flag: t0 = time.monotonic() try: self._listen() # _listen returns normally only on clean EOF / stop if self._stop_flag: return last_error = "connection closed" except _SSEUnsupported: self.connection_status.emit("unsupported", 0, "") return except Exception as exc: last_error = str(exc) or "unknown error" # Reset retries if the connection was up for a while elapsed = time.monotonic() - t0 if elapsed > 30: retries = 0 retries += 1 if retries > self._MAX_RETRIES: self.connection_status.emit("gave_up", retries - 1, last_error) return self.connection_status.emit("disconnected", retries, last_error) delay = min(self._BASE_DELAY * (2 ** (retries - 1)), self._MAX_DELAY) # Interruptible sleep for _ in range(int(delay)): if self._stop_flag: return self.msleep(1000) # -- SSE stream reader -------------------------------------------------- def _listen(self): url = f"{_get_api_url().rstrip('/')}/events" headers = {"Accept": "text/event-stream"} headers.update(_get_auth_headers()) req = urllib.request.Request(url, headers=headers, method="GET") try: self._response = urllib.request.urlopen( req, context=_get_ssl_context(), timeout=90 ) except urllib.error.HTTPError as e: if e.code in (404, 501): raise _SSEUnsupported() raise except urllib.error.URLError: raise self.connection_status.emit("connected", 0, "") event_type = "" data_buf = "" for raw_line in self._response: if self._stop_flag: return line = raw_line.decode("utf-8", errors="replace").rstrip("\r\n") if line == "": # Blank line = dispatch event if data_buf: self._dispatch(event_type or "message", data_buf.strip()) event_type = "" data_buf = "" elif line.startswith("event:"): event_type = line[6:].strip() elif line.startswith("data:"): data_buf += line[5:].strip() + "\n" # Ignore comments (lines starting with ':') and other fields def _dispatch(self, event_type, data): try: payload = json.loads(data) except (json.JSONDecodeError, ValueError): return if event_type == "server.state": mode = payload.get("mode", "") if mode: self.server_mode_changed.emit(mode) return # Job lifecycle events (keyed by job_id, not part_number) job_id = payload.get("job_id", "") if event_type == "job.created": self.job_created.emit( job_id, payload.get("definition_name", ""), payload.get("part_number", ""), ) return if event_type == "job.claimed": self.job_claimed.emit(job_id, payload.get("runner_id", "")) return if event_type == "job.progress": self.job_progress.emit( job_id, int(payload.get("progress", 0)), payload.get("message", ""), ) return if event_type == "job.completed": self.job_completed.emit(job_id) return if event_type == "job.failed": self.job_failed.emit(job_id, payload.get("error", "")) return if event_type == "job.cancelled": self.job_cancelled.emit(job_id) return pn = payload.get("part_number", "") if not pn: return if event_type in ("item_updated", "message"): self.item_updated.emit(pn) elif event_type == "revision_created": rev = payload.get("revision", 0) self.revision_created.emit(pn, int(rev)) elif event_type == "dag.updated": self.dag_updated.emit( pn, int(payload.get("node_count", 0)), int(payload.get("edge_count", 0)), ) elif event_type == "dag.validated": self.dag_validated.emit( pn, bool(payload.get("valid", False)), int(payload.get("failed_count", 0)), ) elif event_type == "bom.merged": self.bom_merged.emit( pn, int(payload.get("added", 0)), int(payload.get("updated", 0)), int(payload.get("unreferenced", 0)), ) class _SSEUnsupported(Exception): """Raised when the server does not support the SSE endpoint.""" # --------------------------------------------------------------------------- # Auth dock widget # --------------------------------------------------------------------------- class SiloAuthDockWidget: """Content widget for the Silo Database Auth dock panel.""" def __init__(self): from PySide import QtCore, QtGui self.widget = QtGui.QWidget() self._event_listener = None self._activity_events = [] # list of (datetime, text, part_number) self._activity_seeded = False self._build_ui() self._refresh_status() self._timer = QtCore.QTimer(self.widget) self._timer.timeout.connect(self._refresh_status) self._timer.start(30000) # Refresh relative timestamps every 60s self._ts_timer = QtCore.QTimer(self.widget) self._ts_timer.timeout.connect(self._rebuild_activity_feed) self._ts_timer.start(60000) # -- UI construction ---------------------------------------------------- def _build_ui(self): from PySide import QtCore, QtGui layout = QtGui.QVBoxLayout(self.widget) layout.setContentsMargins(8, 8, 8, 8) layout.setSpacing(6) # Status row status_row = QtGui.QHBoxLayout() status_row.setSpacing(6) self._status_dot = QtGui.QLabel("\u2b24") self._status_dot.setFixedWidth(16) self._status_dot.setAlignment(QtCore.Qt.AlignCenter) self._status_label = QtGui.QLabel("Checking...") status_row.addWidget(self._status_dot) status_row.addWidget(self._status_label) status_row.addStretch() layout.addLayout(status_row) # User row user_row = QtGui.QHBoxLayout() user_row.setSpacing(6) user_lbl = QtGui.QLabel("User:") user_lbl.setStyleSheet("color: #888;") self._user_label = QtGui.QLabel("(not logged in)") user_row.addWidget(user_lbl) user_row.addWidget(self._user_label) user_row.addStretch() layout.addLayout(user_row) # Role row role_row = QtGui.QHBoxLayout() role_row.setSpacing(6) role_lbl = QtGui.QLabel("Role:") role_lbl.setStyleSheet("color: #888;") self._role_label = QtGui.QLabel("") self._role_label.setStyleSheet("font-size: 11px;") role_row.addWidget(role_lbl) role_row.addWidget(self._role_label) role_row.addStretch() layout.addLayout(role_row) layout.addSpacing(4) # URL row (compact display) url_row = QtGui.QHBoxLayout() url_row.setSpacing(6) url_lbl = QtGui.QLabel("URL:") url_lbl.setStyleSheet("color: #888;") self._url_label = QtGui.QLabel("") self._url_label.setStyleSheet("font-size: 11px;") self._url_label.setWordWrap(True) url_row.addWidget(url_lbl) url_row.addWidget(self._url_label, 1) layout.addLayout(url_row) layout.addSpacing(4) # Live updates row sse_row = QtGui.QHBoxLayout() sse_row.setSpacing(6) sse_lbl = QtGui.QLabel("Live:") sse_lbl.setStyleSheet("color: #888;") self._sse_label = QtGui.QLabel("") self._sse_label.setStyleSheet("font-size: 11px;") sse_row.addWidget(sse_lbl) sse_row.addWidget(self._sse_label) sse_row.addStretch() layout.addLayout(sse_row) # Server mode banner (hidden when normal) self._mode_banner = QtGui.QLabel("") self._mode_banner.setWordWrap(True) self._mode_banner.setContentsMargins(6, 4, 6, 4) self._mode_banner.setVisible(False) layout.addWidget(self._mode_banner) layout.addSpacing(4) # Buttons btn_row = QtGui.QHBoxLayout() btn_row.setSpacing(6) self._login_btn = QtGui.QPushButton("Login") self._login_btn.clicked.connect(self._on_login_clicked) btn_row.addWidget(self._login_btn) settings_btn = QtGui.QToolButton() settings_btn.setText("\u2699") settings_btn.setToolTip("Silo Settings") settings_btn.setFixedSize(28, 28) settings_btn.clicked.connect(self._on_settings_clicked) btn_row.addStretch() btn_row.addWidget(settings_btn) layout.addLayout(btn_row) # Keep the auth panel compact so the Activity panel below gets more space self.widget.setSizePolicy( QtGui.QSizePolicy.Preferred, QtGui.QSizePolicy.Maximum ) # -- Status refresh ----------------------------------------------------- def _refresh_status(self): from PySide import QtGui # Update URL display self._url_label.setText(_get_api_url()) has_token = _client.is_authenticated() username = _get_auth_username() role = _get_auth_role() source = _get_auth_source() # Check server connectivity try: reachable, msg = _client.check_connection() except Exception: reachable = False # If reachable and we have a token, validate it against the server authed = False if reachable and has_token: user = _client.get_current_user() if user and user.get("username"): authed = True username = user["username"] role = user.get("role", "") source = user.get("auth_source", "") _save_auth_info(username=username, role=role, source=source) if authed: self._user_label.setText(username) role_text = role or "" if source: role_text += f" ({source})" if role_text else source self._role_label.setText(role_text) else: self._user_label.setText("(not logged in)") self._role_label.setText("") # Update button state try: self._login_btn.clicked.disconnect() except RuntimeError: pass if reachable and authed: self._status_dot.setStyleSheet("color: #4CAF50; font-size: 10px;") self._status_label.setText("Connected") self._login_btn.setText("Logout") self._login_btn.clicked.connect(self._on_logout_clicked) elif reachable and has_token and not authed: # Token exists but is invalid/expired self._status_dot.setStyleSheet("color: #FF9800; font-size: 10px;") self._status_label.setText("Token invalid") self._login_btn.setText("Login") self._login_btn.clicked.connect(self._on_login_clicked) elif reachable and not has_token: self._status_dot.setStyleSheet("color: #FFC107; font-size: 10px;") self._status_label.setText("Connected (no auth)") self._login_btn.setText("Login") self._login_btn.clicked.connect(self._on_login_clicked) else: self._status_dot.setStyleSheet("color: #F44336; font-size: 10px;") self._status_label.setText("Disconnected") self._login_btn.setText("Login") self._login_btn.clicked.connect(self._on_login_clicked) # Fetch and display server mode global _server_mode if reachable: _server_mode = _fetch_server_mode() else: _server_mode = "offline" self._update_mode_banner() # Manage SSE listener based on auth state self._sync_event_listener(authed) # -- SSE listener management -------------------------------------------- def _sync_event_listener(self, authed): """Start or stop the SSE listener depending on authentication state.""" if authed: if self._event_listener is None or not self._event_listener.isRunning(): self._event_listener = SiloEventListener() self._event_listener.item_updated.connect(self._on_remote_change) self._event_listener.revision_created.connect(self._on_remote_revision) self._event_listener.connection_status.connect(self._on_sse_status) self._event_listener.server_mode_changed.connect(self._on_server_mode) self._event_listener.dag_updated.connect(self._on_dag_updated) self._event_listener.dag_validated.connect(self._on_dag_validated) self._event_listener.bom_merged.connect(self._on_bom_merged) self._event_listener.job_created.connect(self._on_job_created) self._event_listener.job_claimed.connect(self._on_job_claimed) self._event_listener.job_progress.connect(self._on_job_progress) self._event_listener.job_completed.connect(self._on_job_completed) self._event_listener.job_failed.connect(self._on_job_failed) self._event_listener.job_cancelled.connect(self._on_job_cancelled) self._event_listener.start() else: if self._event_listener is not None and self._event_listener.isRunning(): self._event_listener.stop() self._sse_label.setText("") def _on_sse_status(self, status, retry, error): if status == "connected": self._sse_label.setText("Listening") self._sse_label.setStyleSheet("font-size: 11px; color: #4CAF50;") self._sse_label.setToolTip("") FreeCAD.Console.PrintMessage("Silo: SSE connected\n") self._seed_activity_feed() elif status == "disconnected": self._sse_label.setText( f"Reconnecting ({retry}/{SiloEventListener._MAX_RETRIES})..." ) self._sse_label.setStyleSheet("font-size: 11px; color: #FF9800;") self._sse_label.setToolTip(error or "Connection lost") FreeCAD.Console.PrintWarning( f"Silo: SSE reconnecting ({retry}/{SiloEventListener._MAX_RETRIES}): {error}\n" ) elif status == "gave_up": self._sse_label.setText("Disconnected") self._sse_label.setStyleSheet("font-size: 11px; color: #F44336;") self._sse_label.setToolTip(error or "Max retries reached") FreeCAD.Console.PrintError( f"Silo: SSE gave up after {retry} retries: {error}\n" ) elif status == "unsupported": self._sse_label.setText("Not available") self._sse_label.setStyleSheet("font-size: 11px; color: #888;") def _on_server_mode(self, mode): global _server_mode _server_mode = mode self._update_mode_banner() if mode != "normal": self._append_activity_event(f"Server mode: {mode}") def _update_mode_banner(self): _MODE_BANNERS = { "normal": ("", "", False), "read-only": ( "Server is in read-only mode", "background: #FFC107; color: #000; padding: 4px; font-size: 11px;", True, ), "offline": ( "Disconnected from silo", "background: #F44336; color: #fff; padding: 4px; font-size: 11px;", True, ), } text, style, visible = _MODE_BANNERS.get(_server_mode, _MODE_BANNERS["offline"]) self._mode_banner.setText(text) self._mode_banner.setStyleSheet(style) self._mode_banner.setVisible(visible) def _on_remote_change(self, part_number): FreeCAD.Console.PrintMessage(f"Silo: Part {part_number} updated on server\n") mw = FreeCADGui.getMainWindow() if mw is not None: mw.statusBar().showMessage(f"Silo: {part_number} updated on server", 5000) self._append_activity_event(f"{part_number} updated", part_number) def _on_remote_revision(self, part_number, revision): FreeCAD.Console.PrintMessage( f"Silo: New revision {revision} for {part_number}\n" ) mw = FreeCADGui.getMainWindow() if mw is not None: mw.statusBar().showMessage( f"Silo: {part_number} rev {revision} available", 5000 ) self._append_activity_event( f"{part_number} Rev {revision} created", part_number ) def _append_activity_event(self, text, pn=""): """Prepend an event to the activity feed and rebuild the display.""" self._activity_events.insert(0, (datetime.now(), text, pn)) self._activity_events = self._activity_events[:50] self._rebuild_activity_feed() def _seed_activity_feed(self): """One-time: populate the feed with recent items from the database.""" if self._activity_seeded: return self._activity_seeded = True try: items = _client.list_items() if isinstance(items, list): for item in reversed(items[:10]): pn = item.get("part_number", "") desc = item.get("description", "") if desc and len(desc) > 40: desc = desc[:37] + "..." text = f"{pn} \u2013 {desc}" if desc else pn updated = item.get("updated_at", "") ts = datetime.now() if updated: try: ts = datetime.fromisoformat( updated.replace("Z", "+00:00") ).replace(tzinfo=None) except (ValueError, AttributeError): pass self._activity_events.insert(0, (ts, text, pn)) self._activity_events = self._activity_events[:50] except Exception: pass self._rebuild_activity_feed() def _rebuild_activity_feed(self): """Render _activity_events into the Database Activity QListWidget.""" from PySide import QtCore, QtWidgets mw = FreeCADGui.getMainWindow() if mw is None: return panel = mw.findChild(QtWidgets.QDockWidget, "SiloDatabaseActivity") if panel is None: return activity_list = panel.findChild(QtWidgets.QListWidget) if activity_list is None: return activity_list.clear() # Connect interaction signals (once) if not getattr(activity_list, "_silo_connected", False): activity_list.itemDoubleClicked.connect(self._on_activity_double_click) activity_list.setContextMenuPolicy(QtCore.Qt.CustomContextMenu) activity_list.customContextMenuRequested.connect( lambda pos: self._on_activity_context_menu(activity_list, pos) ) activity_list._silo_connected = True if not self._activity_events: item = QtWidgets.QListWidgetItem("(No activity yet)") item.setFlags(QtCore.Qt.NoItemFlags) activity_list.addItem(item) return for ts, text, pn in self._activity_events: label = f"{text} \u00b7 {_relative_time(ts)}" list_item = QtWidgets.QListWidgetItem(label) if pn: list_item.setData(QtCore.Qt.UserRole, pn) activity_list.addItem(list_item) def _on_dag_updated(self, part_number, node_count, edge_count): FreeCAD.Console.PrintMessage( f"Silo: DAG updated for {part_number} ({node_count} nodes, {edge_count} edges)\n" ) self._append_activity_event( f"\u25b6 {part_number} \u2013 DAG synced ({node_count} nodes, {edge_count} edges)", part_number, ) def _on_dag_validated(self, part_number, valid, failed_count): if valid: status = "\u2713 PASS" FreeCAD.Console.PrintMessage(f"Silo: Validation passed for {part_number}\n") else: status = f"\u2717 FAIL ({failed_count} failed)" FreeCAD.Console.PrintWarning( f"Silo: Validation failed for {part_number} ({failed_count} features failed)\n" ) self._append_activity_event(f"{status} \u2013 {part_number}", part_number) def _on_bom_merged(self, part_number, added, updated, unreferenced): parts = [] if added: parts.append(f"+{added} added") if updated: parts.append(f"~{updated} qty changed") if unreferenced: parts.append(f"!{unreferenced} unreferenced") summary = ", ".join(parts) if parts else "no changes" FreeCAD.Console.PrintMessage( f"Silo: BOM merged for {part_number} ({summary})\n" ) self._append_activity_event( f"\u2630 {part_number} \u2013 BOM synced ({summary})", part_number, ) def _on_job_created(self, job_id, definition_name, part_number): FreeCAD.Console.PrintMessage( f"Silo: Job {definition_name} created for {part_number}\n" ) self._append_activity_event( f"\u23f3 {part_number} \u2013 {definition_name} queued", part_number, ) def _on_job_completed(self, job_id): FreeCAD.Console.PrintMessage(f"Silo: Job {job_id} completed\n") self._rebuild_activity_feed() def _on_job_failed(self, job_id, error): FreeCAD.Console.PrintError(f"Silo: Job {job_id} failed: {error}\n") self._append_activity_event(f"\u2717 Job {job_id[:8]} failed: {error}") def _on_job_claimed(self, job_id, runner_id): FreeCAD.Console.PrintMessage( f"Silo: Job {job_id[:8]} claimed by runner {runner_id}\n" ) def _on_job_progress(self, job_id, progress, message): FreeCAD.Console.PrintMessage( f"Silo: Job {job_id[:8]} progress {progress}%: {message}\n" ) def _on_job_cancelled(self, job_id): FreeCAD.Console.PrintMessage(f"Silo: Job {job_id[:8]} cancelled\n") self._append_activity_event(f"\u2718 Job {job_id[:8]} cancelled") def _on_activity_double_click(self, item): """Open/checkout item from activity pane.""" pn = item.data(256) # Qt.UserRole if not pn: return local_path = find_file_by_part_number(pn) if local_path and local_path.exists(): FreeCAD.openDocument(str(local_path)) else: _sync.open_item(pn) def _on_activity_context_menu(self, activity_list, pos): """Show context menu for activity pane items.""" from PySide import QtCore, QtGui item = activity_list.itemAt(pos) if item is None: return pn = item.data(QtCore.Qt.UserRole) if not pn: return menu = QtGui.QMenu() open_action = menu.addAction("Open in Create") browser_action = menu.addAction("Open in Browser") copy_action = menu.addAction("Copy Part Number") revisions_action = menu.addAction("View Revisions") action = menu.exec_(activity_list.mapToGlobal(pos)) if action == open_action: local_path = find_file_by_part_number(pn) if local_path and local_path.exists(): FreeCAD.openDocument(str(local_path)) else: _sync.open_item(pn) elif action == browser_action: import webbrowser api_url = _get_api_url().rstrip("/") base_url = api_url[:-4] if api_url.endswith("/api") else api_url webbrowser.open(f"{base_url}/items/{pn}") elif action == copy_action: QtGui.QApplication.clipboard().setText(pn) elif action == revisions_action: FreeCADGui.runCommand("Silo_Info", 0) # -- Actions ------------------------------------------------------------ def _on_login_clicked(self): self._show_login_dialog() def _on_logout_clicked(self): _client.logout() FreeCAD.Console.PrintMessage("Silo: Logged out\n") self._refresh_status() def _on_settings_clicked(self): FreeCADGui.runCommand("Silo_Settings") # Refresh after settings may have changed self._refresh_status() def _show_login_dialog(self): from PySide import QtCore, QtGui dialog = QtGui.QDialog(self.widget) dialog.setWindowTitle("Silo Login") dialog.setMinimumWidth(380) layout = QtGui.QVBoxLayout(dialog) # Server info server_label = QtGui.QLabel(f"Server: {_get_api_url()}") server_label.setStyleSheet("color: #888; font-size: 11px;") layout.addWidget(server_label) layout.addSpacing(4) info_label = QtGui.QLabel( "Enter your credentials to create a persistent API token. " "Supports local accounts and LDAP (FreeIPA)." ) info_label.setWordWrap(True) info_label.setStyleSheet("color: #888; font-size: 11px;") layout.addWidget(info_label) layout.addSpacing(8) # Username user_label = QtGui.QLabel("Username:") layout.addWidget(user_label) user_input = QtGui.QLineEdit() user_input.setPlaceholderText("Username") last_user = _get_auth_username() if last_user: user_input.setText(last_user) layout.addWidget(user_input) layout.addSpacing(4) # Password pass_label = QtGui.QLabel("Password:") layout.addWidget(pass_label) pass_input = QtGui.QLineEdit() pass_input.setEchoMode(QtGui.QLineEdit.Password) pass_input.setPlaceholderText("Password") layout.addWidget(pass_input) layout.addSpacing(4) # Error / status label status_label = QtGui.QLabel("") status_label.setWordWrap(True) status_label.setVisible(False) layout.addWidget(status_label) layout.addSpacing(8) # Buttons btn_layout = QtGui.QHBoxLayout() login_btn = QtGui.QPushButton("Login") cancel_btn = QtGui.QPushButton("Cancel") btn_layout.addStretch() btn_layout.addWidget(login_btn) btn_layout.addWidget(cancel_btn) layout.addLayout(btn_layout) def on_login(): username = user_input.text().strip() password = pass_input.text() if not username or not password: status_label.setText("Username and password are required.") status_label.setStyleSheet("color: #F44336;") status_label.setVisible(True) return # Disable inputs during login login_btn.setEnabled(False) status_label.setText("Logging in...") status_label.setStyleSheet("color: #888;") status_label.setVisible(True) # Process events so the user sees the status update from PySide.QtWidgets import QApplication QApplication.processEvents() try: result = _client.login(username, password) role = result.get("role", "") source = result.get("auth_source", "") msg = f"Silo: Logged in as {username}" if role: msg += f" ({role})" if source: msg += f" via {source}" FreeCAD.Console.PrintMessage(msg + "\n") dialog.accept() except RuntimeError as e: status_label.setText(str(e)) status_label.setStyleSheet("color: #F44336;") status_label.setVisible(True) login_btn.setEnabled(True) login_btn.clicked.connect(on_login) cancel_btn.clicked.connect(dialog.reject) pass_input.returnPressed.connect(on_login) user_input.returnPressed.connect(lambda: pass_input.setFocus()) dialog.exec_() self._refresh_status() class Silo_Auth: """Show the Silo authentication panel.""" def GetResources(self): return { "MenuText": "Authentication", "ToolTip": "Show Silo authentication status and login", "Pixmap": _icon("auth"), } def Activated(self): from PySide import QtGui mw = FreeCADGui.getMainWindow() if mw is None: return panel = mw.findChild(QtGui.QDockWidget, "SiloDatabaseAuth") if panel: panel.show() panel.raise_() def IsActive(self): return True # --------------------------------------------------------------------------- # Jobs # --------------------------------------------------------------------------- _STATUS_ICONS = { "pending": "\u23f3", # hourglass "claimed": "\u2699", # gear "running": "\u25b6", # play "completed": "\u2714", # check "failed": "\u2717", # cross "cancelled": "\u2013", # dash } class JobMonitorDialog: """Dialog showing job status, logs, and actions.""" def __init__(self, parent=None, part_number=None): from PySide import QtCore, QtGui self._part_number = part_number self._jobs = [] self.dialog = QtGui.QDialog(parent) self.dialog.setWindowTitle("Jobs") self.dialog.setMinimumWidth(850) self.dialog.setMinimumHeight(500) layout = QtGui.QVBoxLayout(self.dialog) # -- Filter bar -- filter_layout = QtGui.QHBoxLayout() self._status_combo = QtGui.QComboBox() self._status_combo.addItems( ["All", "pending", "claimed", "running", "completed", "failed", "cancelled"] ) self._status_combo.currentIndexChanged.connect(self._refresh) filter_layout.addWidget(QtGui.QLabel("Status:")) filter_layout.addWidget(self._status_combo) self._search_edit = QtGui.QLineEdit() self._search_edit.setPlaceholderText("Filter by item or definition...") self._search_edit.returnPressed.connect(self._refresh) filter_layout.addWidget(self._search_edit) filter_layout.addStretch() trigger_btn = QtGui.QPushButton("Trigger Job...") trigger_btn.clicked.connect(self._trigger_job) filter_layout.addWidget(trigger_btn) layout.addLayout(filter_layout) # -- Splitter: table + detail -- splitter = QtGui.QSplitter(QtCore.Qt.Vertical) layout.addWidget(splitter) # Job table self._table = QtGui.QTableWidget() self._table.setColumnCount(7) self._table.setHorizontalHeaderLabels( [ "Status", "Definition", "Item", "Runner", "Progress", "Created", "Duration", ] ) self._table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows) self._table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection) self._table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers) self._table.horizontalHeader().setStretchLastSection(True) self._table.currentCellChanged.connect(self._on_selection_changed) splitter.addWidget(self._table) # Detail panel detail_widget = QtGui.QWidget() detail_layout = QtGui.QVBoxLayout(detail_widget) detail_layout.setContentsMargins(0, 0, 0, 0) detail_header = QtGui.QHBoxLayout() self._detail_label = QtGui.QLabel("Select a job to view details") detail_header.addWidget(self._detail_label) detail_header.addStretch() self._cancel_btn = QtGui.QPushButton("Cancel Job") self._cancel_btn.setEnabled(False) self._cancel_btn.clicked.connect(self._cancel_job) detail_header.addWidget(self._cancel_btn) detail_layout.addLayout(detail_header) self._log_view = QtGui.QTextEdit() self._log_view.setReadOnly(True) self._log_view.setFontFamily("monospace") detail_layout.addWidget(self._log_view) splitter.addWidget(detail_widget) splitter.setSizes([300, 200]) self._refresh() def _refresh(self): from PySide import QtGui status_filter = self._status_combo.currentText() if status_filter == "All": status_filter = "" try: self._jobs = _client.list_jobs( status=status_filter, definition=self._search_edit.text(), ) except Exception as e: FreeCAD.Console.PrintError(f"Silo: Failed to list jobs: {e}\n") self._jobs = [] # Filter by part_number client-side if scoped if self._part_number: self._jobs = [ j for j in self._jobs if j.get("part_number") == self._part_number or j.get("item_id") == self._part_number ] self._table.setRowCount(len(self._jobs)) for row, job in enumerate(self._jobs): status = job.get("status", "") icon = _STATUS_ICONS.get(status, "?") self._table.setItem(row, 0, QtGui.QTableWidgetItem(f"{icon} {status}")) self._table.setItem( row, 1, QtGui.QTableWidgetItem(job.get("definition_name", "")) ) self._table.setItem( row, 2, QtGui.QTableWidgetItem(job.get("part_number", "")) ) self._table.setItem( row, 3, QtGui.QTableWidgetItem(job.get("runner_name", "")) ) progress = job.get("progress", 0) progress_msg = job.get("progress_message", "") progress_text = f"{progress}%" if progress else "" if progress_msg: progress_text += f" {progress_msg}" self._table.setItem(row, 4, QtGui.QTableWidgetItem(progress_text)) created = job.get("created_at", "") if created and len(created) > 16: created = created[:16].replace("T", " ") self._table.setItem(row, 5, QtGui.QTableWidgetItem(created)) duration = job.get("duration_seconds") dur_text = f"{duration}s" if duration else "" self._table.setItem(row, 6, QtGui.QTableWidgetItem(dur_text)) self._table.resizeColumnsToContents() def _on_selection_changed(self, row, _col, _prev_row, _prev_col): from PySide import QtGui if row < 0 or row >= len(self._jobs): self._detail_label.setText("Select a job to view details") self._log_view.clear() self._cancel_btn.setEnabled(False) return job = self._jobs[row] job_id = job.get("id", "") status = job.get("status", "") defn = job.get("definition_name", "") pn = job.get("part_number", "") error = job.get("error_message", "") self._detail_label.setText(f"{defn} \u2014 {pn} \u2014 {status}") self._cancel_btn.setEnabled(status in ("pending", "claimed", "running")) # Load logs self._log_view.clear() if error: self._log_view.append(f"ERROR: {error}\n") try: logs = _client.get_job_logs(job_id) for entry in logs: level = entry.get("level", "info").upper() msg = entry.get("message", "") self._log_view.append(f"[{level}] {msg}") except Exception as e: self._log_view.append(f"(failed to load logs: {e})") def _cancel_job(self): from PySide import QtGui row = self._table.currentRow() if row < 0 or row >= len(self._jobs): return job = self._jobs[row] job_id = job.get("id", "") reply = QtGui.QMessageBox.question( self.dialog, "Cancel Job", f"Cancel job {job.get('definition_name', '')} for {job.get('part_number', '')}?", QtGui.QMessageBox.Yes | QtGui.QMessageBox.No, ) if reply != QtGui.QMessageBox.Yes: return try: _client.cancel_job(job_id) FreeCAD.Console.PrintMessage(f"Silo: Job {job_id} cancelled\n") except Exception as e: QtGui.QMessageBox.warning( self.dialog, "Cancel Failed", f"Failed to cancel job:\n{e}" ) self._refresh() def _trigger_job(self): from PySide import QtGui try: definitions = _client.list_job_definitions() except Exception as e: QtGui.QMessageBox.warning( self.dialog, "Error", f"Failed to load job definitions:\n{e}" ) return if not definitions: QtGui.QMessageBox.information( self.dialog, "No Definitions", "No job definitions are loaded on the server.", ) return names = [d.get("name", "") for d in definitions] name, ok = QtGui.QInputDialog.getItem( self.dialog, "Trigger Job", "Job definition:", names, editable=False ) if not ok or not name: return pn = self._part_number or "" if not pn: pn, ok = QtGui.QInputDialog.getText( self.dialog, "Trigger Job", "Part number (optional):" ) if not ok: return try: result = _client.trigger_job(name, part_number=pn) FreeCAD.Console.PrintMessage( f"Silo: Job triggered: {result.get('id', '')}\n" ) except Exception as e: QtGui.QMessageBox.warning( self.dialog, "Trigger Failed", f"Failed to trigger job:\n{e}" ) self._refresh() def on_job_event(self): """Called from SSE handlers to refresh the table.""" if self.dialog.isVisible(): self._refresh() def exec_(self): self.dialog.exec_() class Silo_Jobs: """View and manage compute jobs.""" def GetResources(self): return { "MenuText": "Jobs", "ToolTip": "View and manage compute jobs", "Pixmap": _icon("info"), } def Activated(self): doc = FreeCAD.ActiveDocument part_number = None if doc: obj = get_tracked_object(doc) if obj and hasattr(obj, "SiloPartNumber"): part_number = obj.SiloPartNumber monitor = JobMonitorDialog( parent=FreeCADGui.getMainWindow(), part_number=part_number, ) monitor.exec_() def IsActive(self): return _client.is_authenticated() # --------------------------------------------------------------------------- # Runners (admin) # --------------------------------------------------------------------------- class RunnerAdminDialog: """Dialog for managing runner registrations.""" def __init__(self, parent=None): from PySide import QtCore, QtGui self._runners = [] self.dialog = QtGui.QDialog(parent) self.dialog.setWindowTitle("Runners") self.dialog.setMinimumWidth(650) self.dialog.setMinimumHeight(350) layout = QtGui.QVBoxLayout(self.dialog) # Runner table self._table = QtGui.QTableWidget() self._table.setColumnCount(5) self._table.setHorizontalHeaderLabels( ["Name", "Tags", "Status", "Last Heartbeat", "Jobs"] ) self._table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows) self._table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection) self._table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers) self._table.horizontalHeader().setStretchLastSection(True) layout.addWidget(self._table) # Buttons btn_layout = QtGui.QHBoxLayout() register_btn = QtGui.QPushButton("Register Runner...") register_btn.clicked.connect(self._register_runner) btn_layout.addWidget(register_btn) delete_btn = QtGui.QPushButton("Delete Runner") delete_btn.clicked.connect(self._delete_runner) btn_layout.addWidget(delete_btn) btn_layout.addStretch() refresh_btn = QtGui.QPushButton("Refresh") refresh_btn.clicked.connect(self._refresh) btn_layout.addWidget(refresh_btn) layout.addLayout(btn_layout) self._refresh() def _refresh(self): from PySide import QtGui try: self._runners = _client.list_runners() except Exception as e: FreeCAD.Console.PrintError(f"Silo: Failed to list runners: {e}\n") self._runners = [] self._table.setRowCount(len(self._runners)) for row, runner in enumerate(self._runners): self._table.setItem(row, 0, QtGui.QTableWidgetItem(runner.get("name", ""))) tags = ", ".join(runner.get("tags", [])) self._table.setItem(row, 1, QtGui.QTableWidgetItem(tags)) status = runner.get("status", "unknown") icon = "\u2705" if status == "online" else "\u26aa" self._table.setItem(row, 2, QtGui.QTableWidgetItem(f"{icon} {status}")) heartbeat = runner.get("last_heartbeat", "") if heartbeat and len(heartbeat) > 16: heartbeat = heartbeat[:16].replace("T", " ") self._table.setItem(row, 3, QtGui.QTableWidgetItem(heartbeat)) jobs = runner.get("jobs_completed", 0) self._table.setItem(row, 4, QtGui.QTableWidgetItem(str(jobs))) self._table.resizeColumnsToContents() def _register_runner(self): from PySide import QtGui name, ok = QtGui.QInputDialog.getText( self.dialog, "Register Runner", "Runner name:" ) if not ok or not name: return tags_str, ok = QtGui.QInputDialog.getText( self.dialog, "Register Runner", "Tags (comma-separated, e.g. create,linux):", ) if not ok: return tags = [t.strip() for t in tags_str.split(",") if t.strip()] try: result = _client.register_runner(name, tags) token = result.get("token", "") QtGui.QMessageBox.information( self.dialog, "Runner Registered", f"Runner {name} registered.\n\n" f"Token (copy now — shown only once):\n\n" f"{token}", ) except Exception as e: QtGui.QMessageBox.warning( self.dialog, "Registration Failed", f"Failed to register runner:\n{e}", ) self._refresh() def _delete_runner(self): from PySide import QtGui row = self._table.currentRow() if row < 0 or row >= len(self._runners): return runner = self._runners[row] runner_name = runner.get("name", "") runner_id = runner.get("id", "") reply = QtGui.QMessageBox.question( self.dialog, "Delete Runner", f"Delete runner {runner_name}?\n\nThis will invalidate its token.", QtGui.QMessageBox.Yes | QtGui.QMessageBox.No, ) if reply != QtGui.QMessageBox.Yes: return try: _client.delete_runner(runner_id) FreeCAD.Console.PrintMessage(f"Silo: Runner {runner_name} deleted\n") except Exception as e: QtGui.QMessageBox.warning( self.dialog, "Delete Failed", f"Failed to delete runner:\n{e}", ) self._refresh() def exec_(self): self.dialog.exec_() class Silo_Runners: """Manage compute runners (admin).""" def GetResources(self): return { "MenuText": "Runners", "ToolTip": "Manage compute runners (admin)", "Pixmap": _icon("info"), } def Activated(self): admin = RunnerAdminDialog(parent=FreeCADGui.getMainWindow()) admin.exec_() def IsActive(self): return _client.is_authenticated() # --------------------------------------------------------------------------- # Start panel # --------------------------------------------------------------------------- class SiloStartPanel: """Content widget for the Silo Start Panel dock.""" def __init__(self): from PySide import QtCore, QtGui self.widget = QtGui.QWidget() self._build_ui() self._refresh() self._timer = QtCore.QTimer(self.widget) self._timer.timeout.connect(self._refresh) self._timer.start(60000) # Refresh every 60 seconds def _build_ui(self): from PySide import QtCore, QtGui layout = QtGui.QVBoxLayout(self.widget) layout.setContentsMargins(8, 8, 8, 8) layout.setSpacing(6) # Connection status badge status_row = QtGui.QHBoxLayout() status_row.setSpacing(6) self._conn_dot = QtGui.QLabel("\u2b24") self._conn_dot.setFixedWidth(16) self._conn_dot.setAlignment(QtCore.Qt.AlignCenter) self._conn_label = QtGui.QLabel("Checking...") self._conn_label.setStyleSheet("font-weight: bold;") status_row.addWidget(self._conn_dot) status_row.addWidget(self._conn_label) status_row.addStretch() layout.addLayout(status_row) layout.addSpacing(8) # My Checkouts section checkout_header = QtGui.QLabel("My Checkouts") checkout_header.setStyleSheet("font-weight: bold; font-size: 12px;") layout.addWidget(checkout_header) self._checkout_list = QtGui.QListWidget() self._checkout_list.setMaximumHeight(160) self._checkout_list.setAlternatingRowColors(True) self._checkout_list.itemDoubleClicked.connect(self._on_checkout_clicked) layout.addWidget(self._checkout_list) layout.addSpacing(8) # Recent Silo Activity section activity_header = QtGui.QLabel("Recent Silo Activity") activity_header.setStyleSheet("font-weight: bold; font-size: 12px;") layout.addWidget(activity_header) self._activity_list = QtGui.QListWidget() self._activity_list.setMaximumHeight(200) self._activity_list.setAlternatingRowColors(True) self._activity_list.itemDoubleClicked.connect(self._on_activity_clicked) layout.addWidget(self._activity_list) layout.addSpacing(8) # Local Recent Files section local_header = QtGui.QLabel("Local Recent Files") local_header.setStyleSheet("font-weight: bold; font-size: 12px;") layout.addWidget(local_header) self._local_list = QtGui.QListWidget() self._local_list.setMaximumHeight(160) self._local_list.setAlternatingRowColors(True) self._local_list.itemDoubleClicked.connect(self._on_local_clicked) layout.addWidget(self._local_list) layout.addStretch() def _refresh(self): self._refresh_connection() self._refresh_checkouts() self._refresh_activity() self._refresh_local() def _refresh_connection(self): try: reachable, _ = _client.check_connection() except Exception: reachable = False if reachable: api_url = _get_api_url() parsed = urllib.parse.urlparse(api_url) hostname = parsed.hostname or api_url self._conn_dot.setStyleSheet("color: #4CAF50; font-size: 10px;") self._conn_label.setText(f"Connected to {hostname}") else: self._conn_dot.setStyleSheet("color: #888; font-size: 10px;") self._conn_label.setText("Disconnected") def _refresh_checkouts(self): self._checkout_list.clear() try: local_files = search_local_files() for item in local_files[:15]: pn = item.get("part_number", "") desc = item.get("description", "") modified = (item.get("modified") or "")[:10] label = f"{pn} {desc}" if modified: label += f" ({modified})" list_item = self._checkout_list.addItem(label) except Exception: self._checkout_list.addItem("(Unable to scan local files)") if self._checkout_list.count() == 0: self._checkout_list.addItem("(No local checkouts)") def _refresh_activity(self): self._activity_list.clear() try: reachable, _ = _client.check_connection() except Exception: reachable = False if not reachable: self._activity_list.addItem("(Not connected)") return try: items = _client.list_items() if isinstance(items, list): # Collect local part numbers for badge comparison local_pns = set() try: for lf in search_local_files(): local_pns.add(lf.get("part_number", "")) except Exception: pass for item in items[:10]: pn = item.get("part_number", "") desc = item.get("description", "") updated = (item.get("updated_at") or "")[:10] badge = "\u2713 " if pn in local_pns else "" label = f"{badge}{pn} {desc}" if updated: label += f" ({updated})" self._activity_list.addItem(label) if self._activity_list.count() == 0: self._activity_list.addItem("(No items in database)") except Exception: self._activity_list.addItem("(Unable to fetch activity)") def _refresh_local(self): from PySide import QtGui self._local_list.clear() try: param = FreeCAD.ParamGet("User parameter:BaseApp/RecentFiles") count = param.GetInt("RecentFiles", 0) for i in range(min(count, 10)): path = param.GetString(f"MRU{i}", "") if path: name = Path(path).name item = QtGui.QListWidgetItem(name) item.setToolTip(path) item.setData(256, path) # Qt.UserRole = 256 self._local_list.addItem(item) except Exception: pass if self._local_list.count() == 0: self._local_list.addItem("(No recent files)") def _on_checkout_clicked(self, item): """Open a local checkout file.""" text = item.text() if text.startswith("("): return pn = text.split()[0] if text else "" if not pn: return local_path = find_file_by_part_number(pn) if local_path and local_path.exists(): FreeCAD.openDocument(str(local_path)) def _on_activity_clicked(self, item): """Open/checkout a remote item.""" text = item.text() if text.startswith("("): return # Strip badge if present text = text.lstrip("\u2713 ") pn = text.split()[0] if text else "" if not pn: return local_path = find_file_by_part_number(pn) if local_path and local_path.exists(): FreeCAD.openDocument(str(local_path)) else: _sync.open_item(pn) def _on_local_clicked(self, item): """Open a local recent file.""" path = item.data(256) # Qt.UserRole if path and Path(path).exists(): FreeCAD.openDocument(path) class Silo_StartPanel: """Show the Silo Start Panel.""" def GetResources(self): return { "MenuText": "Start Panel", "ToolTip": "Show Silo start panel with checkouts and recent activity", "Pixmap": _icon("open"), } def Activated(self): from PySide import QtCore, QtGui mw = FreeCADGui.getMainWindow() if mw is None: return # Reuse existing panel if it exists panel = mw.findChild(QtGui.QDockWidget, "SiloStartPanel") if panel: panel.show() panel.raise_() return # Create new dock widget content = SiloStartPanel() dock = QtGui.QDockWidget("Silo", mw) dock.setObjectName("SiloStartPanel") dock.setWidget(content.widget) dock.setAllowedAreas( QtCore.Qt.LeftDockWidgetArea | QtCore.Qt.RightDockWidgetArea ) mw.addDockWidget(QtCore.Qt.LeftDockWidgetArea, dock) def IsActive(self): return True class SaveAsTemplateDialog: """Dialog to capture template metadata for Save as Template.""" _ITEM_TYPES = ["part", "assembly", "consumable", "tool"] def __init__(self, doc, parent=None): self._doc = doc self._parent = parent def exec_(self): """Show dialog. Returns template_info dict or None if cancelled.""" from PySide import QtGui, QtWidgets dlg = QtWidgets.QDialog(self._parent) dlg.setWindowTitle("Save as Template") dlg.setMinimumWidth(420) layout = QtWidgets.QVBoxLayout(dlg) form = QtWidgets.QFormLayout() form.setSpacing(6) # Pre-populate defaults from document state default_name = self._doc.Label or Path(self._doc.FileName).stem obj = get_tracked_object(self._doc) default_author = _get_auth_username() or os.environ.get("USER", "") default_item_type = "" default_category = "" if obj: if hasattr(obj, "SiloItemType"): default_item_type = getattr(obj, "SiloItemType", "") if hasattr(obj, "SiloPartNumber") and obj.SiloPartNumber: default_category, _ = _parse_part_number(obj.SiloPartNumber) # Name (required) name_edit = QtWidgets.QLineEdit(default_name) name_edit.setPlaceholderText("Template display name") form.addRow("Name:", name_edit) # Description desc_edit = QtWidgets.QLineEdit() desc_edit.setPlaceholderText("What this template is for") form.addRow("Description:", desc_edit) # Item Types (multi-select) type_list = QtWidgets.QListWidget() type_list.setSelectionMode(QtWidgets.QAbstractItemView.MultiSelection) type_list.setMaximumHeight(80) for t in self._ITEM_TYPES: item = QtWidgets.QListWidgetItem(t.capitalize()) item.setData(QtCore.Qt.UserRole, t) type_list.addItem(item) if t == default_item_type: item.setSelected(True) form.addRow("Item Types:", type_list) # Categories (comma-separated prefixes) cat_edit = QtWidgets.QLineEdit(default_category) cat_edit.setPlaceholderText("e.g. F, M01 (empty = all)") form.addRow("Categories:", cat_edit) # Author author_edit = QtWidgets.QLineEdit(default_author) form.addRow("Author:", author_edit) # Tags (comma-separated) tags_edit = QtWidgets.QLineEdit() tags_edit.setPlaceholderText("e.g. sheet metal, fabrication") form.addRow("Tags:", tags_edit) layout.addLayout(form) # Buttons btn_layout = QtWidgets.QHBoxLayout() btn_layout.addStretch() cancel_btn = QtWidgets.QPushButton("Cancel") cancel_btn.clicked.connect(dlg.reject) btn_layout.addWidget(cancel_btn) save_btn = QtWidgets.QPushButton("Save Template") save_btn.setDefault(True) save_btn.clicked.connect(dlg.accept) btn_layout.addWidget(save_btn) layout.addLayout(btn_layout) if dlg.exec_() != QtWidgets.QDialog.Accepted: return None name = name_edit.text().strip() if not name: return None selected_types = [] for i in range(type_list.count()): item = type_list.item(i) if item.isSelected(): selected_types.append(item.data(QtCore.Qt.UserRole)) categories = [c.strip() for c in cat_edit.text().split(",") if c.strip()] tags = [t.strip() for t in tags_edit.text().split(",") if t.strip()] return { "template_version": "1.0", "name": name, "description": desc_edit.text().strip(), "item_types": selected_types, "categories": categories, "icon": "", "author": author_edit.text().strip(), "tags": tags, } class Silo_SaveAsTemplate: """Save a copy of the current document as a reusable template.""" def GetResources(self): return { "MenuText": "Save as Template", "ToolTip": "Save a copy of this document as a reusable template", "Pixmap": _icon("new"), } def Activated(self): import shutil from PySide import QtGui from templates import get_default_template_dir, inject_template_json doc = FreeCAD.ActiveDocument if not doc: return if not doc.FileName: QtGui.QMessageBox.warning( None, "Save as Template", "Please save the document first.", ) return # Capture template metadata from user dialog = SaveAsTemplateDialog(doc) template_info = dialog.exec_() if not template_info: return # Determine destination dest_dir = get_default_template_dir() filename = _sanitize_filename(template_info["name"]) + ".kc" dest = os.path.join(dest_dir, filename) # Check for overwrite if os.path.exists(dest): reply = QtGui.QMessageBox.question( None, "Save as Template", f"Template '{filename}' already exists. Overwrite?", QtGui.QMessageBox.Yes | QtGui.QMessageBox.No, ) if reply != QtGui.QMessageBox.Yes: return # Save current state, then copy doc.save() shutil.copy2(doc.FileName, dest) # Strip Silo identity and inject template descriptor _sync._clean_template_zip(dest) inject_template_json(dest, template_info) FreeCAD.Console.PrintMessage(f"Template saved: {dest}\n") # Offer Silo upload if connected if _server_mode == "normal": reply = QtGui.QMessageBox.question( None, "Save as Template", "Upload template to Silo for team sharing?", QtGui.QMessageBox.Yes | QtGui.QMessageBox.No, ) if reply == QtGui.QMessageBox.Yes: self._upload_to_silo(dest, template_info) QtGui.QMessageBox.information( None, "Save as Template", f"Template '{template_info['name']}' saved.", ) @staticmethod def _upload_to_silo(file_path, template_info): """Upload template to Silo as a shared item. Non-blocking on failure.""" try: schema = _get_schema_name() result = _client.create_item( schema, "T00", template_info.get("name", "Template") ) part_number = result["part_number"] _client._upload_file(part_number, file_path, {}, "Template upload") FreeCAD.Console.PrintMessage( f"Template uploaded to Silo as {part_number}\n" ) except Exception as e: FreeCAD.Console.PrintWarning( f"Template upload to Silo failed (saved locally): {e}\n" ) def IsActive(self): return FreeCAD.ActiveDocument is not None class _DiagWorker(QtCore.QThread): """Background worker that runs connectivity diagnostics.""" result = QtCore.Signal(str, bool, str) # (test_name, passed, detail) finished_all = QtCore.Signal() _HTTP_TIMEOUT = 10 _DNS_TIMEOUT = 5 def run(self): api_url = _get_api_url() base_url = api_url.rstrip("/") if base_url.endswith("/api"): base_url = base_url[:-4] self._test_dns(base_url) self._test_http("Health", f"{base_url}/health", auth=False) self._test_http("Ready", f"{base_url}/ready", auth=False) self._test_http("Auth", f"{api_url.rstrip('/')}/auth/me", auth=True) self._test_sse(api_url) self.finished_all.emit() def _test_dns(self, base_url): parsed = urllib.parse.urlparse(base_url) hostname = parsed.hostname if not hostname: self.result.emit("DNS", False, "no hostname in URL") return try: addrs = socket.getaddrinfo( hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM ) first_ip = addrs[0][4][0] if addrs else "?" self.result.emit("DNS", True, f"{hostname} -> {first_ip}") except socket.gaierror as e: self.result.emit("DNS", False, f"{hostname}: {e}") except Exception as e: self.result.emit("DNS", False, str(e)) def _test_http(self, name, url, auth=False): try: headers = {} if auth: headers.update(_get_auth_headers()) req = urllib.request.Request(url, headers=headers, method="GET") resp = urllib.request.urlopen( req, context=_get_ssl_context(), timeout=self._HTTP_TIMEOUT ) code = resp.getcode() body = resp.read(4096).decode("utf-8", errors="replace").strip() detail = f"HTTP {code}" try: data = json.loads(body) if isinstance(data, dict): parts = [] for key in ("status", "username", "role"): if key in data: parts.append(f"{key}={data[key]}") if parts: detail += f" ({', '.join(parts)})" except (json.JSONDecodeError, ValueError): if len(body) <= 80: detail += f" {body}" self.result.emit(name, code < 400, detail) except urllib.error.HTTPError as e: self.result.emit(name, False, f"HTTP {e.code} {e.reason}") except urllib.error.URLError as e: self.result.emit(name, False, str(e.reason)) except Exception as e: self.result.emit(name, False, str(e)) def _test_sse(self, api_url): url = f"{api_url.rstrip('/')}/events" try: headers = {"Accept": "text/event-stream"} headers.update(_get_auth_headers()) req = urllib.request.Request(url, headers=headers, method="GET") resp = urllib.request.urlopen( req, context=_get_ssl_context(), timeout=self._HTTP_TIMEOUT ) content_type = resp.headers.get("Content-Type", "") code = resp.getcode() resp.close() if "text/event-stream" in content_type: self.result.emit("SSE", True, f"HTTP {code} event-stream") else: self.result.emit("SSE", True, f"HTTP {code} (type: {content_type})") except urllib.error.HTTPError as e: if e.code in (404, 501): self.result.emit("SSE", False, f"HTTP {e.code} (not supported)") else: self.result.emit("SSE", False, f"HTTP {e.code} {e.reason}") except urllib.error.URLError as e: self.result.emit("SSE", False, str(e.reason)) except Exception as e: self.result.emit("SSE", False, str(e)) class Silo_Diag: """Command to run connection diagnostics.""" _worker = None def GetResources(self): return { "MenuText": "Diagnostics", "ToolTip": "Test DNS, health, readiness, auth, and SSE connectivity", "Pixmap": _icon("info"), } def Activated(self): if self._worker is not None and self._worker.isRunning(): FreeCAD.Console.PrintWarning("Silo: diagnostics already running\n") return api_url = _get_api_url() FreeCAD.Console.PrintMessage(f"--- Silo Diagnostics ({api_url}) ---\n") self._worker = _DiagWorker() self._worker.result.connect(self._on_result) self._worker.finished_all.connect(self._on_finished) self._worker.start() def _on_result(self, name, passed, detail): if passed: FreeCAD.Console.PrintMessage(f" PASS {name}: {detail}\n") else: FreeCAD.Console.PrintError(f" FAIL {name}: {detail}\n") def _on_finished(self): FreeCAD.Console.PrintMessage("--- Diagnostics complete ---\n") self._worker = None def IsActive(self): return True # Register commands FreeCADGui.addCommand("Silo_Open", Silo_Open()) FreeCADGui.addCommand("Silo_New", Silo_New()) FreeCADGui.addCommand("Silo_Save", Silo_Save()) FreeCADGui.addCommand("Silo_Commit", Silo_Commit()) FreeCADGui.addCommand("Silo_Pull", Silo_Pull()) FreeCADGui.addCommand("Silo_Push", Silo_Push()) FreeCADGui.addCommand("Silo_Info", Silo_Info()) FreeCADGui.addCommand("Silo_BOM", Silo_BOM()) FreeCADGui.addCommand("Silo_TagProjects", Silo_TagProjects()) FreeCADGui.addCommand("Silo_Rollback", Silo_Rollback()) FreeCADGui.addCommand("Silo_SetStatus", Silo_SetStatus()) FreeCADGui.addCommand("Silo_Settings", Silo_Settings()) FreeCADGui.addCommand("Silo_Auth", Silo_Auth()) FreeCADGui.addCommand("Silo_StartPanel", Silo_StartPanel()) FreeCADGui.addCommand("Silo_Diag", Silo_Diag()) FreeCADGui.addCommand("Silo_Jobs", Silo_Jobs()) FreeCADGui.addCommand("Silo_Runners", Silo_Runners()) FreeCADGui.addCommand("Silo_SaveAsTemplate", Silo_SaveAsTemplate())