"""Silo FreeCAD commands - Streamlined workflow for CAD file management.""" import json import os import re import urllib.error import urllib.parse import urllib.request from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import FreeCAD import FreeCADGui # Configuration SILO_API_URL = os.environ.get("SILO_API_URL", "http://localhost:8080/api") SILO_PROJECTS_DIR = os.environ.get( "SILO_PROJECTS_DIR", os.path.expanduser("~/projects") ) # Icon directory def _get_icon_dir(): """Get the icons directory path.""" locations = [ os.path.join(FreeCAD.getUserAppDataDir(), "Mod", "Silo", "resources", "icons"), os.path.expanduser( "~/.var/app/org.freecad.FreeCAD/data/FreeCAD/Mod/Silo/resources/icons" ), os.path.expanduser("~/.FreeCAD/Mod/Silo/resources/icons"), ] for loc in locations: if os.path.isdir(loc): return loc return "" _ICON_DIR = _get_icon_dir() def _icon(name): """Get icon path by name.""" if _ICON_DIR: path = os.path.join(_ICON_DIR, f"silo-{name}.svg") if os.path.exists(path): return path return "" def get_projects_dir() -> Path: """Get the projects directory.""" projects_dir = Path(SILO_PROJECTS_DIR) projects_dir.mkdir(parents=True, exist_ok=True) return projects_dir class SiloClient: """HTTP client for Silo API.""" def __init__(self, base_url: str = SILO_API_URL): self.base_url = base_url.rstrip("/") def _request( self, method: str, path: str, data: Optional[Dict] = None ) -> Dict[str, Any]: """Make HTTP request to Silo API.""" url = f"{self.base_url}{path}" headers = {"Content-Type": "application/json"} body = json.dumps(data).encode() if data else None req = urllib.request.Request(url, data=body, headers=headers, method=method) try: with urllib.request.urlopen(req) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: error_body = e.read().decode() raise RuntimeError(f"API error {e.code}: {error_body}") except urllib.error.URLError as e: raise RuntimeError(f"Connection error: {e.reason}") def _download_file(self, part_number: str, revision: int, dest_path: str) -> bool: """Download a file from MinIO storage.""" url = f"{self.base_url}/items/{part_number}/file/{revision}" req = urllib.request.Request(url, method="GET") try: with urllib.request.urlopen(req) as resp: with open(dest_path, "wb") as f: while True: chunk = resp.read(8192) if not chunk: break f.write(chunk) return True except urllib.error.HTTPError as e: if e.code == 404: return False raise RuntimeError(f"Download error {e.code}: {e.read().decode()}") except urllib.error.URLError as e: raise RuntimeError(f"Connection error: {e.reason}") def _upload_file( self, part_number: str, file_path: str, properties: Dict, comment: str = "" ) -> Dict[str, Any]: """Upload a file and create a new revision.""" import mimetypes url = f"{self.base_url}/items/{part_number}/file" with open(file_path, "rb") as f: file_data = f.read() boundary = "----SiloUploadBoundary" + str(hash(file_path))[-8:] body_parts = [] filename = os.path.basename(file_path) content_type = mimetypes.guess_type(filename)[0] or "application/octet-stream" body_parts.append( f'--{boundary}\r\nContent-Disposition: form-data; name="file"; filename="{filename}"\r\nContent-Type: {content_type}\r\n\r\n' ) body_parts.append(file_data) body_parts.append(b"\r\n") if comment: body_parts.append( f'--{boundary}\r\nContent-Disposition: form-data; name="comment"\r\n\r\n{comment}\r\n' ) if properties: # Ensure properties is valid JSON - handle special float values props_json = json.dumps(properties, allow_nan=False, default=str) body_parts.append( f'--{boundary}\r\nContent-Disposition: form-data; name="properties"\r\n\r\n{props_json}\r\n' ) body_parts.append(f"--{boundary}--\r\n") body = b"" for part in body_parts: body += part.encode("utf-8") if isinstance(part, str) else part headers = { "Content-Type": f"multipart/form-data; boundary={boundary}", "Content-Length": str(len(body)), } req = urllib.request.Request(url, data=body, headers=headers, method="POST") try: with urllib.request.urlopen(req) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}") except urllib.error.URLError as e: raise RuntimeError(f"Connection error: {e.reason}") def get_item(self, part_number: str) -> Dict[str, Any]: return self._request("GET", f"/items/{part_number}") def list_items( self, search: str = "", item_type: str = "", project: str = "" ) -> list: params = ["limit=100"] if search: params.append(f"search={urllib.parse.quote(search)}") if item_type: params.append(f"type={item_type}") if project: params.append(f"project={project}") return self._request("GET", "/items?" + "&".join(params)) def create_item( self, schema: str, project: str, category: str, description: str = "" ) -> Dict[str, Any]: return self._request( "POST", "/items", { "schema": schema, "project": project, "category": category, "description": description, }, ) def update_item( self, part_number: str, description: str = None, item_type: str = None ) -> Dict[str, Any]: data = {} if description is not None: data["description"] = description if item_type is not None: data["item_type"] = item_type return self._request("PUT", f"/items/{part_number}", data) def get_revisions(self, part_number: str) -> list: return self._request("GET", f"/items/{part_number}/revisions") def get_schema(self, name: str = "kindred-rd") -> Dict[str, Any]: return self._request("GET", f"/schemas/{name}") def get_projects(self) -> list: return self._request("GET", "/projects") def has_file(self, part_number: str) -> Tuple[bool, Optional[int]]: """Check if item has files in MinIO.""" try: revisions = self.get_revisions(part_number) for rev in revisions: if rev.get("file_key"): return True, rev["revision_number"] return False, None except Exception: return False, None _client = SiloClient() # Utility functions def sanitize_filename(name: str) -> str: """Sanitize a string for use in filenames.""" sanitized = re.sub(r'[<>:"/\\|?*]', "_", name) sanitized = re.sub(r"[\s_]+", "_", sanitized) sanitized = sanitized.strip("_ ") return sanitized[:50] def parse_part_number(part_number: str) -> Tuple[str, str, str]: """Parse part number into (project, category, sequence).""" parts = part_number.split("-") if len(parts) >= 3: return parts[0], parts[1], parts[2] return part_number, "", "" def get_cad_file_path(part_number: str, description: str = "") -> Path: """Generate canonical file path for a CAD file.""" project_code, _, _ = parse_part_number(part_number) if description: filename = f"{part_number}_{sanitize_filename(description)}.FCStd" else: filename = f"{part_number}.FCStd" return get_projects_dir() / project_code.lower() / "cad" / filename def find_file_by_part_number(part_number: str) -> Optional[Path]: """Find existing CAD file for a part number.""" project_code, _, _ = parse_part_number(part_number) cad_dir = get_projects_dir() / project_code.lower() / "cad" if not cad_dir.exists(): return None matches = list(cad_dir.glob(f"{part_number}*.FCStd")) return matches[0] if matches else None def search_local_files(search_term: str = "", project_filter: str = "") -> list: """Search for CAD files in local projects directory.""" results = [] base_dir = get_projects_dir() if not base_dir.exists(): return results search_lower = search_term.lower() for project_dir in base_dir.iterdir(): if not project_dir.is_dir(): continue if project_filter and project_dir.name.lower() != project_filter.lower(): continue cad_dir = project_dir / "cad" if not cad_dir.exists(): continue for fcstd_file in cad_dir.glob("*.FCStd"): filename = fcstd_file.stem parts = filename.split("_", 1) part_number = parts[0] description = parts[1].replace("_", " ") if len(parts) > 1 else "" if search_term: searchable = f"{part_number} {description}".lower() if search_lower not in searchable: continue try: from datetime import datetime mtime = fcstd_file.stat().st_mtime modified = datetime.fromtimestamp(mtime).isoformat() except Exception: modified = None results.append( { "path": str(fcstd_file), "part_number": part_number, "description": description, "project": project_dir.name.upper(), "modified": modified, "source": "local", } ) results.sort(key=lambda x: x.get("modified") or "", reverse=True) return results def _safe_float(val): """Convert float to JSON-safe value, handling NaN and Infinity.""" import math if isinstance(val, float): if math.isnan(val) or math.isinf(val): return 0.0 return val def collect_document_properties(doc) -> Dict[str, Any]: """Collect properties from all objects in a document.""" result = { "_document_name": doc.Name, "_file_name": doc.FileName or None, "objects": {}, } for obj in doc.Objects: if obj.TypeId.startswith("App::") and obj.TypeId not in ("App::Part",): continue props = {"_object_type": obj.TypeId, "_label": obj.Label} if hasattr(obj, "Placement"): p = obj.Placement props["placement"] = { "position": { "x": _safe_float(p.Base.x), "y": _safe_float(p.Base.y), "z": _safe_float(p.Base.z), }, "rotation": { "axis": { "x": _safe_float(p.Rotation.Axis.x), "y": _safe_float(p.Rotation.Axis.y), "z": _safe_float(p.Rotation.Axis.z), }, "angle": _safe_float(p.Rotation.Angle), }, } if hasattr(obj, "Shape") and obj.Shape: try: bbox = obj.Shape.BoundBox props["bounding_box"] = { "x_length": _safe_float(bbox.XLength), "y_length": _safe_float(bbox.YLength), "z_length": _safe_float(bbox.ZLength), } if hasattr(obj.Shape, "Volume"): props["volume"] = _safe_float(obj.Shape.Volume) except Exception: pass result["objects"][obj.Label] = props return result def set_silo_properties(obj, props: Dict[str, Any]): """Set Silo properties on FreeCAD object.""" for name, value in props.items(): if not hasattr(obj, name): if isinstance(value, str): obj.addProperty("App::PropertyString", name, "Silo", "") elif isinstance(value, int): obj.addProperty("App::PropertyInteger", name, "Silo", "") setattr(obj, name, value) def get_tracked_object(doc): """Find the primary tracked object in a document.""" for obj in doc.Objects: if hasattr(obj, "SiloPartNumber") and obj.SiloPartNumber: return obj return None class SiloSync: """Handles synchronization between FreeCAD and Silo.""" def __init__(self, client: SiloClient = None): self.client = client or _client def save_to_canonical_path(self, doc, force_rename: bool = False) -> Optional[Path]: """Save document to canonical path.""" obj = get_tracked_object(doc) if not obj: return None part_number = obj.SiloPartNumber try: item = self.client.get_item(part_number) description = item.get("description", "") new_path = get_cad_file_path(part_number, description) new_path.parent.mkdir(parents=True, exist_ok=True) existing_path = find_file_by_part_number(part_number) current_path = Path(doc.FileName) if doc.FileName else None # Use save() if already at the correct path, saveAs() only if path changes if current_path and current_path == new_path: doc.save() elif ( existing_path and existing_path != new_path and (force_rename or current_path == existing_path) ): doc.saveAs(str(new_path)) try: existing_path.unlink() except OSError: pass else: doc.saveAs(str(new_path)) return new_path except Exception as e: FreeCAD.Console.PrintError(f"Save failed: {e}\n") return None def create_document_for_item(self, item: Dict[str, Any], save: bool = True): """Create a new FreeCAD document for a database item.""" part_number = item.get("part_number", "") description = item.get("description", "") item_type = item.get("item_type", "part") if not part_number: return None doc = FreeCAD.newDocument(part_number) safe_name = "_" + part_number if item_type == "assembly": # Create an Assembly object for assembly items (FreeCAD 1.0+) try: assembly_obj = doc.addObject("Assembly::AssemblyObject", safe_name) assembly_obj.Label = part_number set_silo_properties( assembly_obj, { "SiloPartNumber": part_number, "SiloRevision": item.get("current_revision", 1), "SiloItemType": item_type, }, ) except Exception as e: # Fallback to App::Part if Assembly workbench not available FreeCAD.Console.PrintWarning( f"Assembly workbench not available, using App::Part: {e}\n" ) part_obj = doc.addObject("App::Part", safe_name) part_obj.Label = part_number set_silo_properties( part_obj, { "SiloPartNumber": part_number, "SiloRevision": item.get("current_revision", 1), "SiloItemType": item_type, }, ) else: # Create a Part container for non-assembly items part_obj = doc.addObject("App::Part", safe_name) part_obj.Label = part_number set_silo_properties( part_obj, { "SiloPartNumber": part_number, "SiloRevision": item.get("current_revision", 1), "SiloItemType": item_type, }, ) # Add a Body for parts (not assemblies) body_label = sanitize_filename(description) if description else "Body" body = doc.addObject("PartDesign::Body", "_" + body_label) body.Label = body_label part_obj.addObject(body) doc.recompute() if save: file_path = get_cad_file_path(part_number, description) file_path.parent.mkdir(parents=True, exist_ok=True) doc.saveAs(str(file_path)) return doc def open_item(self, part_number: str): """Open or create item document.""" existing_path = find_file_by_part_number(part_number) if existing_path and existing_path.exists(): return FreeCAD.openDocument(str(existing_path)) try: item = self.client.get_item(part_number) return self.create_document_for_item(item, save=True) except Exception as e: FreeCAD.Console.PrintError(f"Failed to open: {e}\n") return None def upload_file( self, part_number: str, file_path: str, comment: str = "Auto-save" ) -> Optional[Dict]: """Upload file to MinIO.""" try: doc = FreeCAD.openDocument(file_path) if not doc: return None properties = collect_document_properties(doc) FreeCAD.closeDocument(doc.Name) return self.client._upload_file(part_number, file_path, properties, comment) except Exception as e: FreeCAD.Console.PrintError(f"Upload failed: {e}\n") return None def download_file(self, part_number: str) -> Optional[Path]: """Download latest file from MinIO.""" try: item = self.client.get_item(part_number) file_path = get_cad_file_path(part_number, item.get("description", "")) file_path.parent.mkdir(parents=True, exist_ok=True) revisions = self.client.get_revisions(part_number) for rev in revisions: if rev.get("file_key"): if self.client._download_file( part_number, rev["revision_number"], str(file_path) ): return file_path return None except Exception as e: FreeCAD.Console.PrintError(f"Download failed: {e}\n") return None _sync = SiloSync() # ============================================================================ # COMMANDS # ============================================================================ class Silo_Open: """Open item - combined search and open dialog.""" def GetResources(self): return { "MenuText": "Open", "ToolTip": "Search and open items (Ctrl+O)", "Pixmap": _icon("open"), } def Activated(self): from PySide import QtCore, QtGui dialog = QtGui.QDialog() dialog.setWindowTitle("Silo - Open Item") dialog.setMinimumWidth(700) dialog.setMinimumHeight(500) layout = QtGui.QVBoxLayout(dialog) # Search row search_layout = QtGui.QHBoxLayout() search_input = QtGui.QLineEdit() search_input.setPlaceholderText("Search by part number or description...") search_layout.addWidget(search_input) layout.addLayout(search_layout) # Filters filter_layout = QtGui.QHBoxLayout() db_checkbox = QtGui.QCheckBox("Database") db_checkbox.setChecked(True) local_checkbox = QtGui.QCheckBox("Local Files") local_checkbox.setChecked(True) filter_layout.addWidget(db_checkbox) filter_layout.addWidget(local_checkbox) filter_layout.addStretch() layout.addLayout(filter_layout) # Results table results_table = QtGui.QTableWidget() results_table.setColumnCount(5) results_table.setHorizontalHeaderLabels( ["Part Number", "Description", "Type", "Source", "Modified"] ) results_table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows) results_table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection) results_table.horizontalHeader().setStretchLastSection(True) results_table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers) layout.addWidget(results_table) results_data = [] def do_search(): nonlocal results_data search_term = search_input.text().strip() results_data = [] results_table.setRowCount(0) if db_checkbox.isChecked(): try: for item in _client.list_items(search=search_term): results_data.append( { "part_number": item.get("part_number", ""), "description": item.get("description", ""), "item_type": item.get("item_type", ""), "source": "database", "modified": item.get("updated_at", "")[:10] if item.get("updated_at") else "", "path": None, } ) except Exception as e: FreeCAD.Console.PrintWarning(f"DB search failed: {e}\n") if local_checkbox.isChecked(): try: for item in search_local_files(search_term): existing = next( ( r for r in results_data if r["part_number"] == item["part_number"] ), None, ) if existing: existing["source"] = "both" existing["path"] = item.get("path") else: results_data.append( { "part_number": item.get("part_number", ""), "description": item.get("description", ""), "item_type": "", "source": "local", "modified": item.get("modified", "")[:10] if item.get("modified") else "", "path": item.get("path"), } ) except Exception as e: FreeCAD.Console.PrintWarning(f"Local search failed: {e}\n") results_table.setRowCount(len(results_data)) for row, data in enumerate(results_data): results_table.setItem( row, 0, QtGui.QTableWidgetItem(data["part_number"]) ) results_table.setItem( row, 1, QtGui.QTableWidgetItem(data["description"]) ) results_table.setItem(row, 2, QtGui.QTableWidgetItem(data["item_type"])) results_table.setItem(row, 3, QtGui.QTableWidgetItem(data["source"])) results_table.setItem(row, 4, QtGui.QTableWidgetItem(data["modified"])) results_table.resizeColumnsToContents() def open_selected(): selected = results_table.selectedItems() if not selected: return row = selected[0].row() data = results_data[row] if data["path"]: FreeCAD.openDocument(data["path"]) else: _sync.open_item(data["part_number"]) dialog.accept() search_input.textChanged.connect(lambda: do_search()) results_table.doubleClicked.connect(open_selected) # Buttons btn_layout = QtGui.QHBoxLayout() open_btn = QtGui.QPushButton("Open") open_btn.clicked.connect(open_selected) cancel_btn = QtGui.QPushButton("Cancel") cancel_btn.clicked.connect(dialog.reject) btn_layout.addStretch() btn_layout.addWidget(open_btn) btn_layout.addWidget(cancel_btn) layout.addLayout(btn_layout) do_search() dialog.exec_() def IsActive(self): return True class Silo_New: """Create new item with part number.""" def GetResources(self): return { "MenuText": "New", "ToolTip": "Create new item (Ctrl+N)", "Pixmap": _icon("new"), } def Activated(self): from PySide import QtGui sel = FreeCADGui.Selection.getSelection() # Project code try: projects = _client.get_projects() if projects: project, ok = QtGui.QInputDialog.getItem( None, "New Item", "Project:", projects, 0, True ) else: project, ok = QtGui.QInputDialog.getText( None, "New Item", "Project code (5 chars):" ) except Exception: project, ok = QtGui.QInputDialog.getText( None, "New Item", "Project code (5 chars):" ) if not ok or not project: return project = project.upper().strip()[:5] # Category try: schema = _client.get_schema() categories = schema.get("segments", []) cat_segment = next( (s for s in categories if s.get("name") == "category"), None ) if cat_segment and cat_segment.get("values"): cat_list = [ f"{k} - {v}" for k, v in sorted(cat_segment["values"].items()) ] category_str, ok = QtGui.QInputDialog.getItem( None, "New Item", "Category:", cat_list, 0, False ) if not ok: return category = category_str.split(" - ")[0] else: category, ok = QtGui.QInputDialog.getText( None, "New Item", "Category code:" ) if not ok: return except Exception: category, ok = QtGui.QInputDialog.getText( None, "New Item", "Category code:" ) if not ok: return # Description default_desc = sel[0].Label if sel else "" description, ok = QtGui.QInputDialog.getText( None, "New Item", "Description:", text=default_desc ) if not ok: return try: result = _client.create_item("kindred-rd", project, category, description) part_number = result["part_number"] if sel: # Tag selected object obj = sel[0] set_silo_properties( obj, {"SiloPartNumber": part_number, "SiloRevision": 1} ) obj.Label = part_number _sync.save_to_canonical_path(FreeCAD.ActiveDocument, force_rename=True) else: # Create new document _sync.create_document_for_item(result, save=True) FreeCAD.Console.PrintMessage(f"Created: {part_number}\n") QtGui.QMessageBox.information( None, "Item Created", f"Part number: {part_number}" ) except Exception as e: QtGui.QMessageBox.critical(None, "Error", str(e)) def IsActive(self): return True class Silo_Save: """Save locally and upload to MinIO.""" def GetResources(self): return { "MenuText": "Save", "ToolTip": "Save locally and upload to MinIO (Ctrl+S)", "Pixmap": _icon("save"), } def Activated(self): doc = FreeCAD.ActiveDocument if not doc: FreeCAD.Console.PrintError("No active document\n") return obj = get_tracked_object(doc) # If not tracked, just do a regular FreeCAD save if not obj: if doc.FileName: doc.save() FreeCAD.Console.PrintMessage(f"Saved: {doc.FileName}\n") else: FreeCADGui.runCommand("Std_SaveAs", 0) return part_number = obj.SiloPartNumber # Check if document has unsaved changes gui_doc = FreeCADGui.getDocument(doc.Name) is_modified = gui_doc.Modified if gui_doc else True FreeCAD.Console.PrintMessage( f"[DEBUG] Modified={is_modified}, FileName={doc.FileName}\n" ) if gui_doc and not is_modified and doc.FileName: FreeCAD.Console.PrintMessage("No changes to save.\n") return # Collect properties BEFORE saving to avoid dirtying the document # (accessing Shape properties can trigger recompute) FreeCAD.Console.PrintMessage("[DEBUG] Collecting properties...\n") properties = collect_document_properties(doc) # Check modified state after collecting properties is_modified_after_props = gui_doc.Modified if gui_doc else True FreeCAD.Console.PrintMessage( f"[DEBUG] After collect_properties: Modified={is_modified_after_props}\n" ) # Save locally FreeCAD.Console.PrintMessage("[DEBUG] Saving to canonical path...\n") file_path = _sync.save_to_canonical_path(doc, force_rename=True) if not file_path: # Fallback to regular save if canonical path fails if doc.FileName: doc.save() file_path = Path(doc.FileName) else: FreeCAD.Console.PrintError("Could not determine save path\n") return # Check modified state after save is_modified_after_save = gui_doc.Modified if gui_doc else True FreeCAD.Console.PrintMessage( f"[DEBUG] After save: Modified={is_modified_after_save}\n" ) # Force clear modified flag if save succeeded (needed for assemblies) if is_modified_after_save and gui_doc: FreeCAD.Console.PrintMessage( "[DEBUG] Attempting to clear Modified flag...\n" ) try: gui_doc.Modified = False FreeCAD.Console.PrintMessage( f"[DEBUG] After force clear: Modified={gui_doc.Modified}\n" ) except Exception as e: FreeCAD.Console.PrintMessage(f"[DEBUG] Could not clear Modified: {e}\n") FreeCAD.Console.PrintMessage(f"Saved: {file_path}\n") # Try to upload to MinIO try: result = _client._upload_file( part_number, str(file_path), properties, "Auto-save" ) new_rev = result["revision_number"] FreeCAD.Console.PrintMessage(f"Uploaded as revision {new_rev}\n") # Check modified state after upload is_modified_after_upload = gui_doc.Modified if gui_doc else True FreeCAD.Console.PrintMessage( f"[DEBUG] After upload: Modified={is_modified_after_upload}\n" ) except Exception as e: FreeCAD.Console.PrintWarning(f"Upload failed: {e}\n") FreeCAD.Console.PrintMessage("File saved locally but not uploaded.\n") def IsActive(self): return FreeCAD.ActiveDocument is not None class Silo_Commit: """Save as new revision with comment.""" def GetResources(self): return { "MenuText": "Commit", "ToolTip": "Save as new revision with comment (Ctrl+Shift+S)", "Pixmap": _icon("commit"), } def Activated(self): from PySide import QtGui doc = FreeCAD.ActiveDocument if not doc: FreeCAD.Console.PrintError("No active document\n") return obj = get_tracked_object(doc) if not obj: FreeCAD.Console.PrintError( "No tracked object. Use 'New' to register first.\n" ) return part_number = obj.SiloPartNumber comment, ok = QtGui.QInputDialog.getText(None, "Commit", "Revision comment:") if not ok: return # Collect properties BEFORE saving to avoid dirtying the document properties = collect_document_properties(doc) try: file_path = _sync.save_to_canonical_path(doc, force_rename=True) if not file_path: return result = _client._upload_file( part_number, str(file_path), properties, comment ) new_rev = result["revision_number"] FreeCAD.Console.PrintMessage(f"Committed revision {new_rev}: {comment}\n") except Exception as e: FreeCAD.Console.PrintError(f"Commit failed: {e}\n") def IsActive(self): return FreeCAD.ActiveDocument is not None class Silo_Pull: """Download from MinIO / sync from database.""" def GetResources(self): return { "MenuText": "Pull", "ToolTip": "Download latest from MinIO or create from database", "Pixmap": _icon("pull"), } def Activated(self): from PySide import QtGui doc = FreeCAD.ActiveDocument part_number = None if doc: obj = get_tracked_object(doc) if obj: part_number = obj.SiloPartNumber if not part_number: part_number, ok = QtGui.QInputDialog.getText(None, "Pull", "Part number:") if not ok or not part_number: return part_number = part_number.strip().upper() # Check if local file exists existing_local = find_file_by_part_number(part_number) # Check if file exists in MinIO has_file, rev_num = _client.has_file(part_number) if has_file: # File exists in MinIO if existing_local: # Local file exists - ask before overwriting reply = QtGui.QMessageBox.question( None, "Pull", f"Download revision {rev_num} and overwrite local file?", QtGui.QMessageBox.Yes | QtGui.QMessageBox.No, ) if reply != QtGui.QMessageBox.Yes: return # Download from MinIO (creates local file automatically) downloaded = _sync.download_file(part_number) if downloaded: FreeCAD.Console.PrintMessage(f"Downloaded: {downloaded}\n") # Automatically open the downloaded file FreeCAD.openDocument(str(downloaded)) else: QtGui.QMessageBox.warning(None, "Pull", "Download failed") else: # No file in MinIO - create from database if existing_local: # Local file already exists, just open it FreeCAD.Console.PrintMessage( f"Opening existing local file: {existing_local}\n" ) FreeCAD.openDocument(str(existing_local)) else: # No local file and no MinIO file - create new from DB try: item = _client.get_item(part_number) new_doc = _sync.create_document_for_item(item, save=True) if new_doc: FreeCAD.Console.PrintMessage( f"Created local file for {part_number}\n" ) else: QtGui.QMessageBox.warning( None, "Pull", f"Failed to create document for {part_number}" ) except Exception as e: QtGui.QMessageBox.warning(None, "Pull", f"Failed: {e}") def IsActive(self): return True class Silo_Push: """Upload local files to MinIO.""" def GetResources(self): return { "MenuText": "Push", "ToolTip": "Upload local files that aren't in MinIO", "Pixmap": _icon("push"), } def Activated(self): from PySide import QtGui # Find unuploaded files local_files = search_local_files() unuploaded = [] for lf in local_files: pn = lf["part_number"] try: _client.get_item(pn) # Check if in DB has_file, _ = _client.has_file(pn) if not has_file: unuploaded.append(lf) except Exception: pass # Not in DB, skip if not unuploaded: QtGui.QMessageBox.information( None, "Push", "All local files are already uploaded." ) return msg = f"Found {len(unuploaded)} files to upload:\n\n" for item in unuploaded[:10]: msg += f" {item['part_number']}\n" if len(unuploaded) > 10: msg += f" ... and {len(unuploaded) - 10} more\n" msg += "\nUpload?" reply = QtGui.QMessageBox.question( None, "Push", msg, QtGui.QMessageBox.Yes | QtGui.QMessageBox.No ) if reply != QtGui.QMessageBox.Yes: return uploaded = 0 for item in unuploaded: result = _sync.upload_file( item["part_number"], item["path"], "Synced from local" ) if result: uploaded += 1 QtGui.QMessageBox.information(None, "Push", f"Uploaded {uploaded} files.") def IsActive(self): return True class Silo_Info: """Show item status and revision history.""" def GetResources(self): return { "MenuText": "Info", "ToolTip": "Show item status and revision history", "Pixmap": _icon("info"), } def Activated(self): from PySide import QtGui doc = FreeCAD.ActiveDocument if not doc: FreeCAD.Console.PrintError("No active document\n") return obj = get_tracked_object(doc) if not obj: FreeCAD.Console.PrintError("No tracked object\n") return part_number = obj.SiloPartNumber try: item = _client.get_item(part_number) revisions = _client.get_revisions(part_number) msg = f"
Type: {item.get('item_type', '-')}
" msg += f"Description: {item.get('description', '-')}
" msg += f"Current Revision: {item.get('current_revision', 1)}
" msg += f"Local Revision: {getattr(obj, 'SiloRevision', '-')}
" has_file, _ = _client.has_file(part_number) msg += f"File in MinIO: {'Yes' if has_file else 'No'}
" msg += "| Rev | Date | File | Comment |
|---|---|---|---|
| {rev['revision_number']} | {date} | {file_icon} | {comment} |