"""Silo FreeCAD commands - Streamlined workflow for CAD file management.""" import json import os import re import socket import urllib.error import urllib.parse import urllib.request from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import FreeCAD import FreeCADGui from PySide import QtCore from silo_client import SiloClient, SiloSettings # Preference group for Kindred Silo settings _PREF_GROUP = "User parameter:BaseApp/Preferences/Mod/KindredSilo" # Configuration - preferences take priority over env vars SILO_PROJECTS_DIR = os.environ.get( "SILO_PROJECTS_DIR", os.path.expanduser("~/projects") ) # --------------------------------------------------------------------------- # Local utility helpers (previously in silo_client, now server-driven) # --------------------------------------------------------------------------- def _parse_part_number(part_number: str) -> Tuple[str, str]: """Parse part number into ``(category, sequence)``. E.g. ``"F01-0001"`` -> ``("F01", "0001")``.""" parts = part_number.split("-") if len(parts) >= 2: return parts[0], parts[1] return part_number, "" def _sanitize_filename(name: str) -> str: """Sanitize a string for use in filenames.""" sanitized = re.sub(r'[<>:"/\\|?*]', "_", name) sanitized = re.sub(r"[\s_]+", "_", sanitized) sanitized = sanitized.strip("_ ") return sanitized[:50] def _relative_time(dt): """Format a datetime as a human-friendly relative string.""" now = datetime.now() diff = now - dt seconds = int(diff.total_seconds()) if seconds < 60: return "just now" minutes = seconds // 60 if minutes < 60: return f"{minutes}m ago" hours = minutes // 60 if hours < 24: return f"{hours}h ago" days = hours // 24 if days < 30: return f"{days}d ago" return dt.strftime("%Y-%m-%d") # --------------------------------------------------------------------------- # FreeCAD settings adapter # --------------------------------------------------------------------------- class FreeCADSiloSettings(SiloSettings): """SiloSettings backed by FreeCAD preferences.""" def get_api_url(self) -> str: param = FreeCAD.ParamGet(_PREF_GROUP) url = param.GetString("ApiUrl", "") if not url: url = os.environ.get("SILO_API_URL", "http://localhost:8080/api") url = url.rstrip("/") if url: parsed = urllib.parse.urlparse(url) if not parsed.path or parsed.path == "/": url = url + "/api" return url def get_api_token(self) -> str: param = FreeCAD.ParamGet(_PREF_GROUP) token = param.GetString("ApiToken", "") if not token: token = os.environ.get("SILO_API_TOKEN", "") return token def get_ssl_verify(self) -> bool: param = FreeCAD.ParamGet(_PREF_GROUP) return param.GetBool("SslVerify", True) def get_ssl_cert_path(self) -> str: param = FreeCAD.ParamGet(_PREF_GROUP) return param.GetString("SslCertPath", "") def save_auth( self, username: str, role: str = "", source: str = "", token: str = "" ): param = FreeCAD.ParamGet(_PREF_GROUP) param.SetString("AuthUsername", username) param.SetString("AuthRole", role) param.SetString("AuthSource", source) if token: param.SetString("ApiToken", token) def get_schema_name(self) -> str: param = FreeCAD.ParamGet(_PREF_GROUP) name = param.GetString("SchemaName", "") if not name: name = os.environ.get("SILO_SCHEMA", "kindred-rd") return name def clear_auth(self): param = FreeCAD.ParamGet(_PREF_GROUP) param.SetString("ApiToken", "") param.SetString("AuthUsername", "") param.SetString("AuthRole", "") param.SetString("AuthSource", "") _fc_settings = FreeCADSiloSettings() # --------------------------------------------------------------------------- # Auth info helpers (read from preferences for UI display) # --------------------------------------------------------------------------- def _get_auth_username() -> str: param = FreeCAD.ParamGet(_PREF_GROUP) return param.GetString("AuthUsername", "") def _get_auth_role() -> str: param = FreeCAD.ParamGet(_PREF_GROUP) return param.GetString("AuthRole", "") def _get_auth_source() -> str: param = FreeCAD.ParamGet(_PREF_GROUP) return param.GetString("AuthSource", "") def _get_auth_token() -> str: return _fc_settings.get_api_token() # Thin wrappers so command classes can call these without refactoring. # They delegate to the settings adapter or the shared client. def _get_api_url() -> str: return _fc_settings.get_api_url() def _get_schema_name() -> str: return _fc_settings.get_schema_name() def _get_ssl_verify() -> bool: return _fc_settings.get_ssl_verify() def _get_ssl_context(): from silo_client._ssl import build_ssl_context return build_ssl_context( _fc_settings.get_ssl_verify(), _fc_settings.get_ssl_cert_path() ) def _get_auth_headers() -> Dict[str, str]: token = _fc_settings.get_api_token() if not token: token = os.environ.get("SILO_API_TOKEN", "") if token: return {"Authorization": f"Bearer {token}"} return {} def _save_auth_info(username: str, role: str = "", source: str = "", token: str = ""): _fc_settings.save_auth(username, role, source, token) def _clear_auth(): _fc_settings.clear_auth() # --------------------------------------------------------------------------- # Server mode tracking # --------------------------------------------------------------------------- _server_mode = "offline" # "normal" | "read-only" | "offline" def _fetch_server_mode() -> str: """Fetch server mode from the /ready endpoint. Returns one of: "normal", "read-only", "offline". """ api_url = _get_api_url().rstrip("/") base_url = api_url[:-4] if api_url.endswith("/api") else api_url url = f"{base_url}/ready" try: req = urllib.request.Request(url, method="GET") resp = urllib.request.urlopen(req, context=_get_ssl_context(), timeout=10) body = resp.read(4096).decode("utf-8", errors="replace") data = json.loads(body) status = data.get("status", "") if status in ("ok", "ready"): return "normal" if status in ("read-only", "read_only", "readonly"): return "read-only" # Unknown status but server responded — treat as normal return "normal" except Exception: return "offline" # --------------------------------------------------------------------------- # Icon helper # --------------------------------------------------------------------------- _ICON_DIR = os.path.join( os.path.dirname(os.path.abspath(__file__)), "resources", "icons" ) def _icon(name): """Get icon path by name.""" if _ICON_DIR: path = os.path.join(_ICON_DIR, f"silo-{name}.svg") if os.path.exists(path): return path return "" # --------------------------------------------------------------------------- # FreeCAD-specific path utilities # --------------------------------------------------------------------------- def get_projects_dir() -> Path: """Get the projects directory.""" projects_dir = Path(SILO_PROJECTS_DIR) projects_dir.mkdir(parents=True, exist_ok=True) return projects_dir def get_cad_file_path(part_number: str, description: str = "") -> Path: """Generate canonical file path for a CAD file. Path format: ~/projects/cad/{category_code}/{part_number}_{description}.kc """ category, _ = _parse_part_number(part_number) if description: filename = f"{part_number}_{_sanitize_filename(description)}.kc" else: filename = f"{part_number}.kc" return get_projects_dir() / "cad" / category / filename def find_file_by_part_number(part_number: str) -> Optional[Path]: """Find existing CAD file for a part number. Prefers .kc over .FCStd.""" category, _ = _parse_part_number(part_number) cad_dir = get_projects_dir() / "cad" / category for search_dir in _search_dirs(cad_dir): for ext in ("*.kc", "*.FCStd"): matches = list(search_dir.glob(f"{part_number}{ext[1:]}")) if matches: return matches[0] return None def _search_dirs(category_dir: Path): """Yield the category dir, then all sibling dirs under cad/.""" if category_dir.exists(): yield category_dir base_cad_dir = category_dir.parent if base_cad_dir.exists(): for subdir in base_cad_dir.iterdir(): if subdir.is_dir() and subdir != category_dir: yield subdir def search_local_files(search_term: str = "", category_filter: str = "") -> list: """Search for CAD files in local cad directory.""" results = [] cad_dir = get_projects_dir() / "cad" if not cad_dir.exists(): return results search_lower = search_term.lower() for category_dir in cad_dir.iterdir(): if not category_dir.is_dir(): continue folder_name = category_dir.name category_code = folder_name.split("_")[0] if "_" in folder_name else folder_name if category_filter and category_code.upper() != category_filter.upper(): continue for fcstd_file in sorted( list(category_dir.glob("*.kc")) + list(category_dir.glob("*.FCStd")) ): filename = fcstd_file.stem parts = filename.split("_", 1) part_number = parts[0] description = parts[1].replace("_", " ") if len(parts) > 1 else "" if search_term: searchable = f"{part_number} {description}".lower() if search_lower not in searchable: continue try: from datetime import datetime mtime = fcstd_file.stat().st_mtime modified = datetime.fromtimestamp(mtime).isoformat() except Exception: modified = None results.append( { "path": str(fcstd_file), "part_number": part_number, "description": description, "category": category_code, "modified": modified, "source": "local", } ) results.sort(key=lambda x: x.get("modified") or "", reverse=True) return results def _safe_float(val): """Convert float to JSON-safe value, handling NaN and Infinity.""" import math if isinstance(val, float): if math.isnan(val) or math.isinf(val): return 0.0 return val def collect_document_properties(doc) -> Dict[str, Any]: """Collect properties from all objects in a document.""" result = { "_document_name": doc.Name, "_file_name": doc.FileName or None, "objects": {}, } for obj in doc.Objects: if obj.TypeId.startswith("App::") and obj.TypeId not in ("App::Part",): continue props = {"_object_type": obj.TypeId, "_label": obj.Label} if hasattr(obj, "Placement"): p = obj.Placement props["placement"] = { "position": { "x": _safe_float(p.Base.x), "y": _safe_float(p.Base.y), "z": _safe_float(p.Base.z), }, "rotation": { "axis": { "x": _safe_float(p.Rotation.Axis.x), "y": _safe_float(p.Rotation.Axis.y), "z": _safe_float(p.Rotation.Axis.z), }, "angle": _safe_float(p.Rotation.Angle), }, } if hasattr(obj, "Shape") and obj.Shape: try: bbox = obj.Shape.BoundBox props["bounding_box"] = { "x_length": _safe_float(bbox.XLength), "y_length": _safe_float(bbox.YLength), "z_length": _safe_float(bbox.ZLength), } if hasattr(obj.Shape, "Volume"): props["volume"] = _safe_float(obj.Shape.Volume) except Exception: pass result["objects"][obj.Label] = props return result def set_silo_properties(obj, props: Dict[str, Any]): """Set Silo properties on FreeCAD object.""" for name, value in props.items(): if not hasattr(obj, name): if isinstance(value, str): obj.addProperty("App::PropertyString", name, "Silo", "") elif isinstance(value, int): obj.addProperty("App::PropertyInteger", name, "Silo", "") setattr(obj, name, value) def get_tracked_object(doc): """Find the primary tracked object in a document.""" for obj in doc.Objects: if hasattr(obj, "SiloPartNumber") and obj.SiloPartNumber: return obj return None # --------------------------------------------------------------------------- # Client and sync instances # --------------------------------------------------------------------------- _client = SiloClient(_fc_settings) class SiloSync: """Handles synchronization between FreeCAD and Silo.""" def __init__(self, client: SiloClient = None): self.client = client or _client def save_to_canonical_path(self, doc, force_rename: bool = False) -> Optional[Path]: """Save document to canonical path.""" obj = get_tracked_object(doc) if not obj: return None part_number = obj.SiloPartNumber try: item = self.client.get_item(part_number) description = item.get("description", "") new_path = get_cad_file_path(part_number, description) new_path.parent.mkdir(parents=True, exist_ok=True) existing_path = find_file_by_part_number(part_number) current_path = Path(doc.FileName) if doc.FileName else None # Use save() if already at the correct path, saveAs() only if path changes if current_path and current_path == new_path: doc.save() elif ( existing_path and existing_path != new_path and (force_rename or current_path == existing_path) ): doc.saveAs(str(new_path)) try: existing_path.unlink() except OSError: pass else: doc.saveAs(str(new_path)) return new_path except Exception as e: FreeCAD.Console.PrintError(f"Save failed: {e}\n") return None def create_document_for_item(self, item: Dict[str, Any], save: bool = True): """Create a new FreeCAD document for a database item.""" part_number = item.get("part_number", "") description = item.get("description", "") item_type = item.get("item_type", "part") if not part_number: return None doc = FreeCAD.newDocument(part_number) safe_name = "_" + part_number if item_type == "assembly": # Create an Assembly object for assembly items (FreeCAD 1.0+) try: assembly_obj = doc.addObject("Assembly::AssemblyObject", safe_name) assembly_obj.Label = part_number set_silo_properties( assembly_obj, { "SiloItemId": item.get("id", ""), "SiloPartNumber": part_number, "SiloRevision": item.get("current_revision", 1), "SiloItemType": item_type, }, ) except Exception as e: # Fallback to App::Part if Assembly workbench not available FreeCAD.Console.PrintWarning( f"Assembly workbench not available, using App::Part: {e}\n" ) part_obj = doc.addObject("App::Part", safe_name) part_obj.Label = part_number set_silo_properties( part_obj, { "SiloItemId": item.get("id", ""), "SiloPartNumber": part_number, "SiloRevision": item.get("current_revision", 1), "SiloItemType": item_type, }, ) else: # Create a Part container for non-assembly items part_obj = doc.addObject("App::Part", safe_name) part_obj.Label = part_number set_silo_properties( part_obj, { "SiloItemId": item.get("id", ""), "SiloPartNumber": part_number, "SiloRevision": item.get("current_revision", 1), "SiloItemType": item_type, }, ) # Add a Body for parts (not assemblies) body_label = _sanitize_filename(description) if description else "Body" body = doc.addObject("PartDesign::Body", "_" + body_label) body.Label = body_label part_obj.addObject(body) doc.recompute() if save: file_path = get_cad_file_path(part_number, description) file_path.parent.mkdir(parents=True, exist_ok=True) doc.saveAs(str(file_path)) return doc def create_document_from_template( self, item: Dict[str, Any], template_path: str, save: bool = True ): """Create a new document by copying a template .kc and stamping Silo properties. Falls back to :meth:`create_document_for_item` if *template_path* is missing or invalid. """ import shutil part_number = item.get("part_number", "") description = item.get("description", "") item_type = item.get("item_type", "part") if not part_number or not os.path.isfile(template_path): return self.create_document_for_item(item, save=save) dest_path = get_cad_file_path(part_number, description) dest_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(template_path, str(dest_path)) self._clean_template_zip(str(dest_path)) doc = FreeCAD.openDocument(str(dest_path)) if not doc: return None # Find the root container (first App::Part or Assembly) root_obj = None for obj in doc.Objects: if obj.TypeId in ("App::Part", "Assembly::AssemblyObject"): root_obj = obj break if root_obj is None and doc.Objects: root_obj = doc.Objects[0] if root_obj: root_obj.Label = part_number set_silo_properties( root_obj, { "SiloItemId": item.get("id", ""), "SiloPartNumber": part_number, "SiloRevision": item.get("current_revision", 1), "SiloItemType": item_type, }, ) doc.recompute() if save: doc.save() return doc @staticmethod def _clean_template_zip(filepath: str): """Strip ``silo/template.json`` and ``silo/manifest.json`` from a copied template. The manifest is auto-recreated by ``kc_format.py`` on next save. """ import zipfile tmp_path = filepath + ".tmp" try: with zipfile.ZipFile(filepath, "r") as zf_in: with zipfile.ZipFile(tmp_path, "w") as zf_out: for entry in zf_in.infolist(): if entry.filename in ( "silo/template.json", "silo/manifest.json", ): continue zf_out.writestr(entry, zf_in.read(entry.filename)) os.replace(tmp_path, filepath) except Exception as exc: if os.path.exists(tmp_path): os.unlink(tmp_path) FreeCAD.Console.PrintWarning(f"Failed to clean template ZIP: {exc}\n") def open_item(self, part_number: str): """Open or create item document.""" existing_path = find_file_by_part_number(part_number) if existing_path and existing_path.exists(): return FreeCAD.openDocument(str(existing_path)) try: item = self.client.get_item(part_number) return self.create_document_for_item(item, save=True) except Exception as e: FreeCAD.Console.PrintError(f"Failed to open: {e}\n") return None def upload_file( self, part_number: str, file_path: str, comment: str = "Auto-save" ) -> Optional[Dict]: """Upload file to the server.""" try: doc = FreeCAD.openDocument(file_path) if not doc: return None properties = collect_document_properties(doc) FreeCAD.closeDocument(doc.Name) return self.client._upload_file(part_number, file_path, properties, comment) except Exception as e: FreeCAD.Console.PrintError(f"Upload failed: {e}\n") return None def download_file(self, part_number: str) -> Optional[Path]: """Download the latest revision file from the server.""" try: item = self.client.get_item(part_number) file_path = get_cad_file_path(part_number, item.get("description", "")) file_path.parent.mkdir(parents=True, exist_ok=True) revisions = self.client.get_revisions(part_number) for rev in revisions: if rev.get("file_key"): if self.client._download_file( part_number, rev["revision_number"], str(file_path) ): return file_path return None except Exception as e: FreeCAD.Console.PrintError(f"Download failed: {e}\n") return None _sync = SiloSync() # --------------------------------------------------------------------------- # kindred:// URL handler # --------------------------------------------------------------------------- def handle_kindred_url(url: str): """Handle a ``kindred://`` URL by opening the referenced item. URL format:: kindred://item/{part_number} kindred://item/{part_number}/revision/{rev_number} Called from C++ ``MainWindow::processMessages()`` when a ``kindred://`` URL arrives via IPC, or from ``InitGui.py`` for cold-start URL arguments. """ from urllib.parse import urlparse parsed = urlparse(url) if parsed.scheme != "kindred": return # urlparse treats "kindred://item/PN-001" as netloc="item", path="/PN-001" parts = [parsed.netloc] + [p for p in parsed.path.split("/") if p] if len(parts) >= 2 and parts[0] == "item": part_number = parts[1] FreeCAD.Console.PrintMessage( f"Silo: Opening item {part_number} from kindred:// URL\n" ) _sync.open_item(part_number) # ============================================================================ # COMMANDS # ============================================================================ class Silo_Open: """Open item - combined search and open dialog.""" def GetResources(self): return { "MenuText": "Open", "ToolTip": "Search and open items (Ctrl+O)", "Pixmap": _icon("open"), } def Activated(self): from open_search import OpenItemWidget from PySide import QtGui, QtWidgets mw = FreeCADGui.getMainWindow() mdi = mw.findChild(QtWidgets.QMdiArea) if not mdi: return widget = OpenItemWidget(_client, search_local_files) sw = mdi.addSubWindow(widget) sw.setWindowTitle("Open Item") sw.setWindowIcon(QtGui.QIcon(_icon("open"))) sw.show() mdi.setActiveSubWindow(sw) def _on_selected(data): sw.close() if data.get("path"): FreeCAD.openDocument(data["path"]) else: _sync.open_item(data["part_number"]) widget.item_selected.connect(_on_selected) widget.cancelled.connect(sw.close) def IsActive(self): return True class Silo_New: """Create new item with part number. Opens a pre-document MDI tab containing the schema-driven creation form. Each invocation opens a new tab so multiple items can be prepared in parallel. On successful creation the tab closes and the real document opens in its place. """ def GetResources(self): return { "MenuText": "New", "ToolTip": "Create new item (Ctrl+N)", "Pixmap": _icon("new"), } def Activated(self): from PySide import QtGui, QtWidgets from schema_form import SchemaFormWidget mw = FreeCADGui.getMainWindow() mdi = mw.findChild(QtWidgets.QMdiArea) if not mdi: return # Each invocation creates a new pre-document tab form = SchemaFormWidget(_client) # Pre-fill description from current selection sel = FreeCADGui.Selection.getSelection() if sel: form._desc_edit.setText(sel[0].Label) # Add as MDI subwindow (appears as a tab alongside documents) sw = mdi.addSubWindow(form) sw.setWindowTitle("New Item") sw.setWindowIcon(QtGui.QIcon(_icon("new"))) sw.show() mdi.setActiveSubWindow(sw) # On creation: process result, close tab, open real document def _on_created(result): part_number = result["part_number"] try: sel_now = FreeCADGui.Selection.getSelection() if sel_now: obj = sel_now[0] set_silo_properties( obj, { "SiloItemId": result.get("id", ""), "SiloPartNumber": part_number, "SiloRevision": 1, }, ) obj.Label = part_number _sync.save_to_canonical_path( FreeCAD.ActiveDocument, force_rename=True ) else: template_path = result.get("_form_data", {}).get("template_path") if template_path: _sync.create_document_from_template( result, template_path, save=True ) else: _sync.create_document_for_item(result, save=True) FreeCAD.Console.PrintMessage(f"Created: {part_number}\n") except Exception as e: FreeCAD.Console.PrintError(f"Failed to process created item: {e}\n") # Close the pre-document tab sw.close() form.item_created.connect(_on_created) form.cancelled.connect(sw.close) def IsActive(self): return _server_mode == "normal" def _push_dag_after_upload(doc, part_number, revision_number): """Extract and push the feature DAG after a successful upload. Failures are logged as warnings -- DAG sync must never block save. """ try: from dag import extract_dag nodes, edges = extract_dag(doc) if not nodes: return result = _client.push_dag(part_number, revision_number, nodes, edges) node_count = result.get("node_count", len(nodes)) edge_count = result.get("edge_count", len(edges)) FreeCAD.Console.PrintMessage( f"DAG synced: {node_count} nodes, {edge_count} edges\n" ) except Exception as e: FreeCAD.Console.PrintWarning(f"DAG sync failed: {e}\n") def _update_manifest_revision(file_path, revision_number): """Write revision_hash into the .kc manifest after a successful upload. Failures are logged as warnings -- must never block save. """ try: from kc_format import update_manifest_fields update_manifest_fields( file_path, { "revision_hash": str(revision_number), }, ) except Exception as e: FreeCAD.Console.PrintWarning(f"Manifest revision update failed: {e}\n") def _push_bom_after_upload(doc, part_number, revision_number): """Extract and sync Assembly BOM after a successful upload. Only runs for Assembly documents with cross-document links. Failures are logged as warnings -- BOM sync must never block save. """ try: from bom_sync import sync_bom_after_upload result = sync_bom_after_upload(doc, part_number, _client) if result is None: return # Not an assembly or no cross-doc links parts = [] if result.added_count: parts.append(f"+{result.added_count} added") if result.updated_count: parts.append(f"~{result.updated_count} qty updated") if result.unreferenced_count: parts.append(f"!{result.unreferenced_count} unreferenced") if result.unresolved_count: FreeCAD.Console.PrintWarning( f"BOM sync: {result.unresolved_count} components " f"have no Silo part number\n" ) if parts: FreeCAD.Console.PrintMessage(f"BOM synced: {', '.join(parts)}\n") else: FreeCAD.Console.PrintMessage("BOM synced: no changes\n") for err in result.errors: FreeCAD.Console.PrintWarning(f"BOM sync error: {err}\n") except Exception as e: FreeCAD.Console.PrintWarning(f"BOM sync failed: {e}\n") class Silo_Save: """Save locally and upload to the server.""" def GetResources(self): return { "MenuText": "Save", "ToolTip": "Save locally and upload to server (Ctrl+S)", "Pixmap": _icon("save"), } def Activated(self): doc = FreeCAD.ActiveDocument if not doc: FreeCAD.Console.PrintError("No active document\n") return obj = get_tracked_object(doc) # If not tracked, just do a regular FreeCAD save if not obj: if doc.FileName: doc.save() FreeCAD.Console.PrintMessage(f"Saved: {doc.FileName}\n") else: FreeCADGui.runCommand("Std_SaveAs", 0) return part_number = obj.SiloPartNumber # Check if document has unsaved changes gui_doc = FreeCADGui.getDocument(doc.Name) is_modified = gui_doc.Modified if gui_doc else True if gui_doc and not is_modified and doc.FileName: FreeCAD.Console.PrintMessage("No changes to save.\n") return # Collect properties BEFORE saving to avoid dirtying the document # (accessing Shape properties can trigger recompute) properties = collect_document_properties(doc) # Save locally file_path = _sync.save_to_canonical_path(doc, force_rename=True) if not file_path: # Fallback to regular save if canonical path fails if doc.FileName: doc.save() file_path = Path(doc.FileName) else: FreeCAD.Console.PrintError("Could not determine save path\n") return # Force clear modified flag if save succeeded (needed for assemblies) if gui_doc and gui_doc.Modified: try: gui_doc.Modified = False except Exception: pass FreeCAD.Console.PrintMessage(f"Saved: {file_path}\n") # Try to upload to server try: result = _client._upload_file( part_number, str(file_path), properties, "Auto-save" ) new_rev = result["revision_number"] FreeCAD.Console.PrintMessage(f"Uploaded as revision {new_rev}\n") _push_dag_after_upload(doc, part_number, new_rev) _push_bom_after_upload(doc, part_number, new_rev) _update_manifest_revision(str(file_path), new_rev) except Exception as e: FreeCAD.Console.PrintWarning(f"Upload failed: {e}\n") FreeCAD.Console.PrintMessage("File saved locally but not uploaded.\n") def IsActive(self): return FreeCAD.ActiveDocument is not None and _server_mode == "normal" class Silo_Commit: """Save as new revision with comment.""" def GetResources(self): return { "MenuText": "Commit", "ToolTip": "Save as new revision with comment (Ctrl+Shift+S)", "Pixmap": _icon("commit"), } def Activated(self): from PySide import QtGui doc = FreeCAD.ActiveDocument if not doc: FreeCAD.Console.PrintError("No active document\n") return obj = get_tracked_object(doc) if not obj: FreeCAD.Console.PrintError( "No tracked object. Use 'New' to register first.\n" ) return part_number = obj.SiloPartNumber comment, ok = QtGui.QInputDialog.getText(None, "Commit", "Revision comment:") if not ok: return # Collect properties BEFORE saving to avoid dirtying the document properties = collect_document_properties(doc) try: file_path = _sync.save_to_canonical_path(doc, force_rename=True) if not file_path: return result = _client._upload_file( part_number, str(file_path), properties, comment ) new_rev = result["revision_number"] FreeCAD.Console.PrintMessage(f"Committed revision {new_rev}: {comment}\n") _push_dag_after_upload(doc, part_number, new_rev) _push_bom_after_upload(doc, part_number, new_rev) _update_manifest_revision(str(file_path), new_rev) except Exception as e: FreeCAD.Console.PrintError(f"Commit failed: {e}\n") def IsActive(self): return FreeCAD.ActiveDocument is not None and _server_mode == "normal" def _check_pull_conflicts(part_number, local_path, doc=None): """Check for conflicts between local file and server state. Returns a list of conflict description strings, or an empty list if clean. """ conflicts = [] # Check for unsaved changes in an open document if doc is not None: gui_doc = FreeCADGui.getDocument(doc.Name) if doc.Name else None if gui_doc and gui_doc.Modified: conflicts.append("Document has unsaved local changes.") # Check local revision vs server latest if doc is not None: obj = get_tracked_object(doc) if obj and hasattr(obj, "SiloRevision"): local_rev = getattr(obj, "SiloRevision", 0) latest = _client.latest_file_revision(part_number) if latest and local_rev and latest["revision_number"] > local_rev: conflicts.append( f"Local file is at revision {local_rev}, " f"server has revision {latest['revision_number']}." ) # Check local file mtime vs server timestamp if local_path and local_path.exists(): import datetime local_mtime = datetime.datetime.fromtimestamp( local_path.stat().st_mtime, tz=datetime.timezone.utc ) try: item = _client.get_item(part_number) server_updated = item.get("updated_at", "") if server_updated: # Parse ISO format timestamp server_dt = datetime.datetime.fromisoformat( server_updated.replace("Z", "+00:00") ) if server_dt > local_mtime: conflicts.append("Server version is newer than local file.") except Exception: pass return conflicts class SiloPullDialog: """Dialog for selecting which revision to pull.""" def __init__(self, part_number, revisions, parent=None): from PySide import QtCore, QtGui self._selected_revision = None self._dialog = QtGui.QDialog(parent) self._dialog.setWindowTitle(f"Pull - {part_number}") self._dialog.setMinimumWidth(600) self._dialog.setMinimumHeight(350) layout = QtGui.QVBoxLayout(self._dialog) info = QtGui.QLabel(f"Select a revision to download for {part_number}:") layout.addWidget(info) # Revision table self._table = QtGui.QTableWidget() self._table.setColumnCount(5) self._table.setHorizontalHeaderLabels( ["Rev", "Date", "Comment", "Status", "File"] ) self._table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows) self._table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection) self._table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers) self._table.verticalHeader().setVisible(False) header = self._table.horizontalHeader() header.setStretchLastSection(True) # Populate rows file_revisions = [] self._table.setRowCount(len(revisions)) for i, rev in enumerate(revisions): rev_num = rev.get("revision_number", "") date = rev.get("created_at", "")[:16].replace("T", " ") comment = rev.get("comment", "") status = rev.get("status", "") has_file = "\u2713" if rev.get("file_key") else "" self._table.setItem(i, 0, QtGui.QTableWidgetItem(str(rev_num))) self._table.setItem(i, 1, QtGui.QTableWidgetItem(date)) self._table.setItem(i, 2, QtGui.QTableWidgetItem(comment)) self._table.setItem(i, 3, QtGui.QTableWidgetItem(status)) file_item = QtGui.QTableWidgetItem(has_file) file_item.setTextAlignment(QtCore.Qt.AlignCenter) self._table.setItem(i, 4, file_item) if rev.get("file_key"): file_revisions.append(i) self._table.resizeColumnsToContents() layout.addWidget(self._table) # Pre-select the latest revision with a file if file_revisions: self._table.selectRow(file_revisions[0]) # Store revision data for lookup self._revisions = revisions # Buttons btn_layout = QtGui.QHBoxLayout() btn_layout.addStretch() download_btn = QtGui.QPushButton("Download") cancel_btn = QtGui.QPushButton("Cancel") download_btn.clicked.connect(self._on_download) cancel_btn.clicked.connect(self._dialog.reject) self._table.doubleClicked.connect(self._on_download) btn_layout.addWidget(download_btn) btn_layout.addWidget(cancel_btn) layout.addLayout(btn_layout) def _on_download(self): row = self._table.currentRow() if row < 0: return rev = self._revisions[row] if not rev.get("file_key"): from PySide import QtGui QtGui.QMessageBox.information( self._dialog, "Pull", "Selected revision has no file attached." ) return self._selected_revision = rev self._dialog.accept() def exec_(self): if self._dialog.exec_() == 1: # QDialog.Accepted return self._selected_revision return None def _pull_dependencies(part_number, progress_callback=None): """Recursively pull all BOM children that have files on the server. Returns list of (part_number, dest_path) tuples for successfully pulled files. Skips children that already exist locally. """ pulled = [] try: bom = _client.get_bom(part_number) except Exception as e: FreeCAD.Console.PrintWarning(f"Could not fetch BOM for {part_number}: {e}\n") return pulled for entry in bom: child_pn = entry.get("child_part_number") if not child_pn: continue # Skip if already exists locally existing = find_file_by_part_number(child_pn) if existing and existing.exists(): FreeCAD.Console.PrintMessage( f" {child_pn}: already exists at {existing}\n" ) # Still recurse — this child may itself be an assembly with missing deps _pull_dependencies(child_pn, progress_callback) continue # Check if this child has a file on the server try: latest = _client.latest_file_revision(child_pn) except Exception: latest = None if not latest or not latest.get("file_key"): FreeCAD.Console.PrintMessage(f" {child_pn}: no file on server, skipping\n") continue # Determine destination path child_desc = entry.get("child_description", "") dest_path = get_cad_file_path(child_pn, child_desc) dest_path.parent.mkdir(parents=True, exist_ok=True) rev_num = latest["revision_number"] FreeCAD.Console.PrintMessage(f" Pulling {child_pn} rev {rev_num}...\n") try: ok = _client._download_file( child_pn, rev_num, str(dest_path), progress_callback=progress_callback ) if ok: pulled.append((child_pn, dest_path)) except Exception as e: FreeCAD.Console.PrintWarning(f" Failed to pull {child_pn}: {e}\n") # Recurse into child (it may be a sub-assembly) _pull_dependencies(child_pn, progress_callback) return pulled class Silo_Pull: """Download revision file from the server.""" def GetResources(self): return { "MenuText": "Pull", "ToolTip": "Download file with revision selection", "Pixmap": _icon("pull"), } def Activated(self): from PySide import QtGui doc = FreeCAD.ActiveDocument part_number = None obj = None if doc: obj = get_tracked_object(doc) if obj: part_number = obj.SiloPartNumber if not part_number: part_number, ok = QtGui.QInputDialog.getText(None, "Pull", "Part number:") if not ok or not part_number: return part_number = part_number.strip().upper() # Fetch revisions from server try: revisions = _client.get_revisions(part_number) except Exception as e: QtGui.QMessageBox.warning(None, "Pull", f"Cannot reach server: {e}") return existing_local = find_file_by_part_number(part_number) # If no revisions have files, fall back to create-from-database has_any_file = any(r.get("file_key") for r in revisions) if not has_any_file: if existing_local: FreeCAD.Console.PrintMessage( f"Opening existing local file: {existing_local}\n" ) FreeCAD.openDocument(str(existing_local)) else: try: item = _client.get_item(part_number) new_doc = _sync.create_document_for_item(item, save=True) if new_doc: FreeCAD.Console.PrintMessage( f"Created local file for {part_number}\n" ) else: QtGui.QMessageBox.warning( None, "Pull", f"Failed to create document for {part_number}", ) except Exception as e: QtGui.QMessageBox.warning(None, "Pull", f"Failed: {e}") return # Conflict detection conflicts = _check_pull_conflicts(part_number, existing_local, doc) if conflicts: detail = "\n".join(f" - {c}" for c in conflicts) reply = QtGui.QMessageBox.warning( None, "Pull - Conflicts Detected", f"Potential conflicts found:\n{detail}\n\n" "Download anyway and overwrite local file?", QtGui.QMessageBox.Yes | QtGui.QMessageBox.No, ) if reply != QtGui.QMessageBox.Yes: return # Show revision selection dialog dlg = SiloPullDialog(part_number, revisions) selected = dlg.exec_() if selected is None: return rev_num = selected["revision_number"] # Determine destination path try: item = _client.get_item(part_number) except Exception: item = {} dest_path = get_cad_file_path(part_number, item.get("description", "")) dest_path.parent.mkdir(parents=True, exist_ok=True) # Download with progress progress = QtGui.QProgressDialog( f"Downloading {part_number} rev {rev_num}...", "Cancel", 0, 100 ) progress.setWindowModality(QtCore.Qt.WindowModal) progress.setMinimumDuration(0) progress.setValue(0) def on_progress(downloaded, total): if progress.wasCanceled(): return if total > 0: pct = int(downloaded * 100 / total) progress.setValue(min(pct, 99)) else: # Indeterminate - pulse between 0-90 progress.setValue(min(int(downloaded / 1024) % 90, 89)) QtGui.QApplication.processEvents() try: ok = _client._download_file( part_number, rev_num, str(dest_path), progress_callback=on_progress ) except Exception as e: progress.close() QtGui.QMessageBox.warning(None, "Pull", f"Download failed: {e}") return progress.setValue(100) progress.close() if not ok: QtGui.QMessageBox.warning(None, "Pull", "Download failed.") return FreeCAD.Console.PrintMessage(f"Pulled revision {rev_num} of {part_number}\n") # Pull assembly dependencies before opening so links resolve if item.get("item_type") == "assembly": progress.setLabelText(f"Pulling dependencies for {part_number}...") progress.setValue(0) progress.show() dep_pulled = _pull_dependencies(part_number, progress_callback=on_progress) progress.setValue(100) progress.close() if dep_pulled: FreeCAD.Console.PrintMessage( f"Pulled {len(dep_pulled)} dependency file(s)\n" ) # Close existing document if open, then reopen if doc and doc.FileName == str(dest_path): FreeCAD.closeDocument(doc.Name) FreeCAD.openDocument(str(dest_path)) # Update SiloRevision property on the tracked object new_doc = FreeCAD.ActiveDocument if new_doc: new_obj = get_tracked_object(new_doc) if new_obj and hasattr(new_obj, "SiloRevision"): new_obj.SiloRevision = rev_num new_doc.save() def IsActive(self): return True class Silo_Push: """Upload local files to the server.""" def GetResources(self): return { "MenuText": "Push", "ToolTip": "Upload local files that aren't on the server", "Pixmap": _icon("push"), } def Activated(self): from datetime import datetime, timezone from PySide import QtGui # Find files that need uploading (no server file, or local is newer) local_files = search_local_files() unuploaded = [] for lf in local_files: pn = lf["part_number"] try: _client.get_item(pn) # Check if in DB server_rev = _client.latest_file_revision(pn) if not server_rev: # No file on server at all unuploaded.append(lf) else: # Compare local mtime against server revision timestamp try: local_mtime = os.path.getmtime(lf["path"]) server_time_str = server_rev.get("created_at", "") if server_time_str: server_dt = datetime.fromisoformat( server_time_str.replace("Z", "+00:00") ) local_dt = datetime.fromtimestamp( local_mtime, tz=timezone.utc ) if local_dt > server_dt: unuploaded.append(lf) else: # Can't parse server time, assume needs upload unuploaded.append(lf) except Exception: # On any comparison error, include it unuploaded.append(lf) except Exception: pass # Not in DB, skip if not unuploaded: QtGui.QMessageBox.information( None, "Push", "All local files are already uploaded." ) return msg = f"Found {len(unuploaded)} files to upload:\n\n" for item in unuploaded[:10]: msg += f" {item['part_number']}\n" if len(unuploaded) > 10: msg += f" ... and {len(unuploaded) - 10} more\n" msg += "\nUpload?" reply = QtGui.QMessageBox.question( None, "Push", msg, QtGui.QMessageBox.Yes | QtGui.QMessageBox.No ) if reply != QtGui.QMessageBox.Yes: return uploaded = 0 for item in unuploaded: result = _sync.upload_file( item["part_number"], item["path"], "Synced from local" ) if result: uploaded += 1 QtGui.QMessageBox.information(None, "Push", f"Uploaded {uploaded} files.") def IsActive(self): return _server_mode == "normal" class Silo_Info: """Show item status and revision history.""" def GetResources(self): return { "MenuText": "Info", "ToolTip": "Show item status and revision history", "Pixmap": _icon("info"), } def Activated(self): from PySide import QtGui doc = FreeCAD.ActiveDocument if not doc: FreeCAD.Console.PrintError("No active document\n") return obj = get_tracked_object(doc) if not obj: FreeCAD.Console.PrintError("No tracked object\n") return part_number = obj.SiloPartNumber try: item = _client.get_item(part_number) revisions = _client.get_revisions(part_number) # Get projects for item try: projects = _client.get_item_projects(part_number) project_codes = [p.get("code", "") for p in projects if p.get("code")] except Exception: project_codes = [] msg = f"
Type: {item.get('item_type', '-')}
" msg += f"Description: {item.get('description', '-')}
" msg += f"Projects: {', '.join(project_codes) if project_codes else 'None'}
" msg += f"Current Revision: {item.get('current_revision', 1)}
" msg += f"Local Revision: {getattr(obj, 'SiloRevision', '-')}
" has_file, _ = _client.has_file(part_number) msg += f"File on Server: {'Yes' if has_file else 'No'}
" # Show current revision status if revisions: current_status = revisions[0].get("status", "draft") current_labels = revisions[0].get("labels", []) msg += f"Current Status: {current_status}
" if current_labels: msg += f"Labels: {', '.join(current_labels)}
" msg += "| Rev | Status | Date | File | Comment |
|---|---|---|---|---|
| {rev['revision_number']} | {status} | {date} | {file_icon} | {comment} |
{token}",
)
except Exception as e:
QtGui.QMessageBox.warning(
self.dialog,
"Registration Failed",
f"Failed to register runner:\n{e}",
)
self._refresh()
def _delete_runner(self):
from PySide import QtGui
row = self._table.currentRow()
if row < 0 or row >= len(self._runners):
return
runner = self._runners[row]
runner_name = runner.get("name", "")
runner_id = runner.get("id", "")
reply = QtGui.QMessageBox.question(
self.dialog,
"Delete Runner",
f"Delete runner {runner_name}?\n\nThis will invalidate its token.",
QtGui.QMessageBox.Yes | QtGui.QMessageBox.No,
)
if reply != QtGui.QMessageBox.Yes:
return
try:
_client.delete_runner(runner_id)
FreeCAD.Console.PrintMessage(f"Silo: Runner {runner_name} deleted\n")
except Exception as e:
QtGui.QMessageBox.warning(
self.dialog,
"Delete Failed",
f"Failed to delete runner:\n{e}",
)
self._refresh()
def exec_(self):
self.dialog.exec_()
class Silo_Runners:
"""Manage compute runners (admin)."""
def GetResources(self):
return {
"MenuText": "Runners",
"ToolTip": "Manage compute runners (admin)",
"Pixmap": _icon("info"),
}
def Activated(self):
admin = RunnerAdminDialog(parent=FreeCADGui.getMainWindow())
admin.exec_()
def IsActive(self):
return _client.is_authenticated()
# ---------------------------------------------------------------------------
# Start panel
# ---------------------------------------------------------------------------
class SiloStartPanel:
"""Content widget for the Silo Start Panel dock."""
def __init__(self):
from PySide import QtCore, QtGui
self.widget = QtGui.QWidget()
self._build_ui()
self._refresh()
self._timer = QtCore.QTimer(self.widget)
self._timer.timeout.connect(self._refresh)
self._timer.start(60000) # Refresh every 60 seconds
def _build_ui(self):
from PySide import QtCore, QtGui
layout = QtGui.QVBoxLayout(self.widget)
layout.setContentsMargins(8, 8, 8, 8)
layout.setSpacing(6)
# Connection status badge
status_row = QtGui.QHBoxLayout()
status_row.setSpacing(6)
self._conn_dot = QtGui.QLabel("\u2b24")
self._conn_dot.setFixedWidth(16)
self._conn_dot.setAlignment(QtCore.Qt.AlignCenter)
self._conn_label = QtGui.QLabel("Checking...")
self._conn_label.setStyleSheet("font-weight: bold;")
status_row.addWidget(self._conn_dot)
status_row.addWidget(self._conn_label)
status_row.addStretch()
layout.addLayout(status_row)
layout.addSpacing(8)
# My Checkouts section
checkout_header = QtGui.QLabel("My Checkouts")
checkout_header.setStyleSheet("font-weight: bold; font-size: 12px;")
layout.addWidget(checkout_header)
self._checkout_list = QtGui.QListWidget()
self._checkout_list.setMaximumHeight(160)
self._checkout_list.setAlternatingRowColors(True)
self._checkout_list.itemDoubleClicked.connect(self._on_checkout_clicked)
layout.addWidget(self._checkout_list)
layout.addSpacing(8)
# Recent Silo Activity section
activity_header = QtGui.QLabel("Recent Silo Activity")
activity_header.setStyleSheet("font-weight: bold; font-size: 12px;")
layout.addWidget(activity_header)
self._activity_list = QtGui.QListWidget()
self._activity_list.setMaximumHeight(200)
self._activity_list.setAlternatingRowColors(True)
self._activity_list.itemDoubleClicked.connect(self._on_activity_clicked)
layout.addWidget(self._activity_list)
layout.addSpacing(8)
# Local Recent Files section
local_header = QtGui.QLabel("Local Recent Files")
local_header.setStyleSheet("font-weight: bold; font-size: 12px;")
layout.addWidget(local_header)
self._local_list = QtGui.QListWidget()
self._local_list.setMaximumHeight(160)
self._local_list.setAlternatingRowColors(True)
self._local_list.itemDoubleClicked.connect(self._on_local_clicked)
layout.addWidget(self._local_list)
layout.addStretch()
def _refresh(self):
self._refresh_connection()
self._refresh_checkouts()
self._refresh_activity()
self._refresh_local()
def _refresh_connection(self):
try:
reachable, _ = _client.check_connection()
except Exception:
reachable = False
if reachable:
api_url = _get_api_url()
parsed = urllib.parse.urlparse(api_url)
hostname = parsed.hostname or api_url
self._conn_dot.setStyleSheet("color: #4CAF50; font-size: 10px;")
self._conn_label.setText(f"Connected to {hostname}")
else:
self._conn_dot.setStyleSheet("color: #888; font-size: 10px;")
self._conn_label.setText("Disconnected")
def _refresh_checkouts(self):
self._checkout_list.clear()
try:
local_files = search_local_files()
for item in local_files[:15]:
pn = item.get("part_number", "")
desc = item.get("description", "")
modified = (item.get("modified") or "")[:10]
label = f"{pn} {desc}"
if modified:
label += f" ({modified})"
list_item = self._checkout_list.addItem(label)
except Exception:
self._checkout_list.addItem("(Unable to scan local files)")
if self._checkout_list.count() == 0:
self._checkout_list.addItem("(No local checkouts)")
def _refresh_activity(self):
self._activity_list.clear()
try:
reachable, _ = _client.check_connection()
except Exception:
reachable = False
if not reachable:
self._activity_list.addItem("(Not connected)")
return
try:
items = _client.list_items()
if isinstance(items, list):
# Collect local part numbers for badge comparison
local_pns = set()
try:
for lf in search_local_files():
local_pns.add(lf.get("part_number", ""))
except Exception:
pass
for item in items[:10]:
pn = item.get("part_number", "")
desc = item.get("description", "")
updated = (item.get("updated_at") or "")[:10]
badge = "\u2713 " if pn in local_pns else ""
label = f"{badge}{pn} {desc}"
if updated:
label += f" ({updated})"
self._activity_list.addItem(label)
if self._activity_list.count() == 0:
self._activity_list.addItem("(No items in database)")
except Exception:
self._activity_list.addItem("(Unable to fetch activity)")
def _refresh_local(self):
from PySide import QtGui
self._local_list.clear()
try:
param = FreeCAD.ParamGet("User parameter:BaseApp/RecentFiles")
count = param.GetInt("RecentFiles", 0)
for i in range(min(count, 10)):
path = param.GetString(f"MRU{i}", "")
if path:
name = Path(path).name
item = QtGui.QListWidgetItem(name)
item.setToolTip(path)
item.setData(256, path) # Qt.UserRole = 256
self._local_list.addItem(item)
except Exception:
pass
if self._local_list.count() == 0:
self._local_list.addItem("(No recent files)")
def _on_checkout_clicked(self, item):
"""Open a local checkout file."""
text = item.text()
if text.startswith("("):
return
pn = text.split()[0] if text else ""
if not pn:
return
local_path = find_file_by_part_number(pn)
if local_path and local_path.exists():
FreeCAD.openDocument(str(local_path))
def _on_activity_clicked(self, item):
"""Open/checkout a remote item."""
text = item.text()
if text.startswith("("):
return
# Strip badge if present
text = text.lstrip("\u2713 ")
pn = text.split()[0] if text else ""
if not pn:
return
local_path = find_file_by_part_number(pn)
if local_path and local_path.exists():
FreeCAD.openDocument(str(local_path))
else:
_sync.open_item(pn)
def _on_local_clicked(self, item):
"""Open a local recent file."""
path = item.data(256) # Qt.UserRole
if path and Path(path).exists():
FreeCAD.openDocument(path)
class Silo_StartPanel:
"""Show the Silo Start Panel."""
def GetResources(self):
return {
"MenuText": "Start Panel",
"ToolTip": "Show Silo start panel with checkouts and recent activity",
"Pixmap": _icon("open"),
}
def Activated(self):
from PySide import QtCore, QtGui
mw = FreeCADGui.getMainWindow()
if mw is None:
return
# Reuse existing panel if it exists
panel = mw.findChild(QtGui.QDockWidget, "SiloStartPanel")
if panel:
panel.show()
panel.raise_()
return
# Create new dock widget
content = SiloStartPanel()
dock = QtGui.QDockWidget("Silo", mw)
dock.setObjectName("SiloStartPanel")
dock.setWidget(content.widget)
dock.setAllowedAreas(
QtCore.Qt.LeftDockWidgetArea | QtCore.Qt.RightDockWidgetArea
)
mw.addDockWidget(QtCore.Qt.LeftDockWidgetArea, dock)
def IsActive(self):
return True
class SaveAsTemplateDialog:
"""Dialog to capture template metadata for Save as Template."""
_ITEM_TYPES = ["part", "assembly", "consumable", "tool"]
def __init__(self, doc, parent=None):
self._doc = doc
self._parent = parent
def exec_(self):
"""Show dialog. Returns template_info dict or None if cancelled."""
from PySide import QtGui, QtWidgets
dlg = QtWidgets.QDialog(self._parent)
dlg.setWindowTitle("Save as Template")
dlg.setMinimumWidth(420)
layout = QtWidgets.QVBoxLayout(dlg)
form = QtWidgets.QFormLayout()
form.setSpacing(6)
# Pre-populate defaults from document state
default_name = self._doc.Label or Path(self._doc.FileName).stem
obj = get_tracked_object(self._doc)
default_author = _get_auth_username() or os.environ.get("USER", "")
default_item_type = ""
default_category = ""
if obj:
if hasattr(obj, "SiloItemType"):
default_item_type = getattr(obj, "SiloItemType", "")
if hasattr(obj, "SiloPartNumber") and obj.SiloPartNumber:
default_category, _ = _parse_part_number(obj.SiloPartNumber)
# Name (required)
name_edit = QtWidgets.QLineEdit(default_name)
name_edit.setPlaceholderText("Template display name")
form.addRow("Name:", name_edit)
# Description
desc_edit = QtWidgets.QLineEdit()
desc_edit.setPlaceholderText("What this template is for")
form.addRow("Description:", desc_edit)
# Item Types (multi-select)
type_list = QtWidgets.QListWidget()
type_list.setSelectionMode(QtWidgets.QAbstractItemView.MultiSelection)
type_list.setMaximumHeight(80)
for t in self._ITEM_TYPES:
item = QtWidgets.QListWidgetItem(t.capitalize())
item.setData(QtCore.Qt.UserRole, t)
type_list.addItem(item)
if t == default_item_type:
item.setSelected(True)
form.addRow("Item Types:", type_list)
# Categories (comma-separated prefixes)
cat_edit = QtWidgets.QLineEdit(default_category)
cat_edit.setPlaceholderText("e.g. F, M01 (empty = all)")
form.addRow("Categories:", cat_edit)
# Author
author_edit = QtWidgets.QLineEdit(default_author)
form.addRow("Author:", author_edit)
# Tags (comma-separated)
tags_edit = QtWidgets.QLineEdit()
tags_edit.setPlaceholderText("e.g. sheet metal, fabrication")
form.addRow("Tags:", tags_edit)
layout.addLayout(form)
# Buttons
btn_layout = QtWidgets.QHBoxLayout()
btn_layout.addStretch()
cancel_btn = QtWidgets.QPushButton("Cancel")
cancel_btn.clicked.connect(dlg.reject)
btn_layout.addWidget(cancel_btn)
save_btn = QtWidgets.QPushButton("Save Template")
save_btn.setDefault(True)
save_btn.clicked.connect(dlg.accept)
btn_layout.addWidget(save_btn)
layout.addLayout(btn_layout)
if dlg.exec_() != QtWidgets.QDialog.Accepted:
return None
name = name_edit.text().strip()
if not name:
return None
selected_types = []
for i in range(type_list.count()):
item = type_list.item(i)
if item.isSelected():
selected_types.append(item.data(QtCore.Qt.UserRole))
categories = [c.strip() for c in cat_edit.text().split(",") if c.strip()]
tags = [t.strip() for t in tags_edit.text().split(",") if t.strip()]
return {
"template_version": "1.0",
"name": name,
"description": desc_edit.text().strip(),
"item_types": selected_types,
"categories": categories,
"icon": "",
"author": author_edit.text().strip(),
"tags": tags,
}
class Silo_SaveAsTemplate:
"""Save a copy of the current document as a reusable template."""
def GetResources(self):
return {
"MenuText": "Save as Template",
"ToolTip": "Save a copy of this document as a reusable template",
"Pixmap": _icon("new"),
}
def Activated(self):
import shutil
from PySide import QtGui
from templates import get_default_template_dir, inject_template_json
doc = FreeCAD.ActiveDocument
if not doc:
return
if not doc.FileName:
QtGui.QMessageBox.warning(
None,
"Save as Template",
"Please save the document first.",
)
return
# Capture template metadata from user
dialog = SaveAsTemplateDialog(doc)
template_info = dialog.exec_()
if not template_info:
return
# Determine destination
dest_dir = get_default_template_dir()
filename = _sanitize_filename(template_info["name"]) + ".kc"
dest = os.path.join(dest_dir, filename)
# Check for overwrite
if os.path.exists(dest):
reply = QtGui.QMessageBox.question(
None,
"Save as Template",
f"Template '{filename}' already exists. Overwrite?",
QtGui.QMessageBox.Yes | QtGui.QMessageBox.No,
)
if reply != QtGui.QMessageBox.Yes:
return
# Save current state, then copy
doc.save()
shutil.copy2(doc.FileName, dest)
# Strip Silo identity and inject template descriptor
_sync._clean_template_zip(dest)
inject_template_json(dest, template_info)
FreeCAD.Console.PrintMessage(f"Template saved: {dest}\n")
# Offer Silo upload if connected
if _server_mode == "normal":
reply = QtGui.QMessageBox.question(
None,
"Save as Template",
"Upload template to Silo for team sharing?",
QtGui.QMessageBox.Yes | QtGui.QMessageBox.No,
)
if reply == QtGui.QMessageBox.Yes:
self._upload_to_silo(dest, template_info)
QtGui.QMessageBox.information(
None,
"Save as Template",
f"Template '{template_info['name']}' saved.",
)
@staticmethod
def _upload_to_silo(file_path, template_info):
"""Upload template to Silo as a shared item. Non-blocking on failure."""
try:
schema = _get_schema_name()
result = _client.create_item(
schema, "T00", template_info.get("name", "Template")
)
part_number = result["part_number"]
_client._upload_file(part_number, file_path, {}, "Template upload")
FreeCAD.Console.PrintMessage(
f"Template uploaded to Silo as {part_number}\n"
)
except Exception as e:
FreeCAD.Console.PrintWarning(
f"Template upload to Silo failed (saved locally): {e}\n"
)
def IsActive(self):
return FreeCAD.ActiveDocument is not None
class _DiagWorker(QtCore.QThread):
"""Background worker that runs connectivity diagnostics."""
result = QtCore.Signal(str, bool, str) # (test_name, passed, detail)
finished_all = QtCore.Signal()
_HTTP_TIMEOUT = 10
_DNS_TIMEOUT = 5
def run(self):
api_url = _get_api_url()
base_url = api_url.rstrip("/")
if base_url.endswith("/api"):
base_url = base_url[:-4]
self._test_dns(base_url)
self._test_http("Health", f"{base_url}/health", auth=False)
self._test_http("Ready", f"{base_url}/ready", auth=False)
self._test_http("Auth", f"{api_url.rstrip('/')}/auth/me", auth=True)
self._test_sse(api_url)
self.finished_all.emit()
def _test_dns(self, base_url):
parsed = urllib.parse.urlparse(base_url)
hostname = parsed.hostname
if not hostname:
self.result.emit("DNS", False, "no hostname in URL")
return
try:
addrs = socket.getaddrinfo(
hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM
)
first_ip = addrs[0][4][0] if addrs else "?"
self.result.emit("DNS", True, f"{hostname} -> {first_ip}")
except socket.gaierror as e:
self.result.emit("DNS", False, f"{hostname}: {e}")
except Exception as e:
self.result.emit("DNS", False, str(e))
def _test_http(self, name, url, auth=False):
try:
headers = {}
if auth:
headers.update(_get_auth_headers())
req = urllib.request.Request(url, headers=headers, method="GET")
resp = urllib.request.urlopen(
req, context=_get_ssl_context(), timeout=self._HTTP_TIMEOUT
)
code = resp.getcode()
body = resp.read(4096).decode("utf-8", errors="replace").strip()
detail = f"HTTP {code}"
try:
data = json.loads(body)
if isinstance(data, dict):
parts = []
for key in ("status", "username", "role"):
if key in data:
parts.append(f"{key}={data[key]}")
if parts:
detail += f" ({', '.join(parts)})"
except (json.JSONDecodeError, ValueError):
if len(body) <= 80:
detail += f" {body}"
self.result.emit(name, code < 400, detail)
except urllib.error.HTTPError as e:
self.result.emit(name, False, f"HTTP {e.code} {e.reason}")
except urllib.error.URLError as e:
self.result.emit(name, False, str(e.reason))
except Exception as e:
self.result.emit(name, False, str(e))
def _test_sse(self, api_url):
url = f"{api_url.rstrip('/')}/events"
try:
headers = {"Accept": "text/event-stream"}
headers.update(_get_auth_headers())
req = urllib.request.Request(url, headers=headers, method="GET")
resp = urllib.request.urlopen(
req, context=_get_ssl_context(), timeout=self._HTTP_TIMEOUT
)
content_type = resp.headers.get("Content-Type", "")
code = resp.getcode()
resp.close()
if "text/event-stream" in content_type:
self.result.emit("SSE", True, f"HTTP {code} event-stream")
else:
self.result.emit("SSE", True, f"HTTP {code} (type: {content_type})")
except urllib.error.HTTPError as e:
if e.code in (404, 501):
self.result.emit("SSE", False, f"HTTP {e.code} (not supported)")
else:
self.result.emit("SSE", False, f"HTTP {e.code} {e.reason}")
except urllib.error.URLError as e:
self.result.emit("SSE", False, str(e.reason))
except Exception as e:
self.result.emit("SSE", False, str(e))
class Silo_Diag:
"""Command to run connection diagnostics."""
_worker = None
def GetResources(self):
return {
"MenuText": "Diagnostics",
"ToolTip": "Test DNS, health, readiness, auth, and SSE connectivity",
"Pixmap": _icon("info"),
}
def Activated(self):
if self._worker is not None and self._worker.isRunning():
FreeCAD.Console.PrintWarning("Silo: diagnostics already running\n")
return
api_url = _get_api_url()
FreeCAD.Console.PrintMessage(f"--- Silo Diagnostics ({api_url}) ---\n")
self._worker = _DiagWorker()
self._worker.result.connect(self._on_result)
self._worker.finished_all.connect(self._on_finished)
self._worker.start()
def _on_result(self, name, passed, detail):
if passed:
FreeCAD.Console.PrintMessage(f" PASS {name}: {detail}\n")
else:
FreeCAD.Console.PrintError(f" FAIL {name}: {detail}\n")
def _on_finished(self):
FreeCAD.Console.PrintMessage("--- Diagnostics complete ---\n")
self._worker = None
def IsActive(self):
return True
# Register commands
FreeCADGui.addCommand("Silo_Open", Silo_Open())
FreeCADGui.addCommand("Silo_New", Silo_New())
FreeCADGui.addCommand("Silo_Save", Silo_Save())
FreeCADGui.addCommand("Silo_Commit", Silo_Commit())
FreeCADGui.addCommand("Silo_Pull", Silo_Pull())
FreeCADGui.addCommand("Silo_Push", Silo_Push())
FreeCADGui.addCommand("Silo_Info", Silo_Info())
FreeCADGui.addCommand("Silo_BOM", Silo_BOM())
FreeCADGui.addCommand("Silo_TagProjects", Silo_TagProjects())
FreeCADGui.addCommand("Silo_Rollback", Silo_Rollback())
FreeCADGui.addCommand("Silo_SetStatus", Silo_SetStatus())
FreeCADGui.addCommand("Silo_Settings", Silo_Settings())
FreeCADGui.addCommand("Silo_Auth", Silo_Auth())
FreeCADGui.addCommand("Silo_StartPanel", Silo_StartPanel())
FreeCADGui.addCommand("Silo_Diag", Silo_Diag())
FreeCADGui.addCommand("Silo_Jobs", Silo_Jobs())
FreeCADGui.addCommand("Silo_Runners", Silo_Runners())
FreeCADGui.addCommand("Silo_SaveAsTemplate", Silo_SaveAsTemplate())