3354 lines
118 KiB
Python
3354 lines
118 KiB
Python
"""Silo FreeCAD commands - Streamlined workflow for CAD file management."""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import socket
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import FreeCAD
|
|
import FreeCADGui
|
|
from PySide import QtCore
|
|
from silo_client import (
|
|
CATEGORY_NAMES,
|
|
SiloClient,
|
|
SiloSettings,
|
|
get_category_folder_name,
|
|
parse_part_number,
|
|
sanitize_filename,
|
|
)
|
|
|
|
# Preference group for Kindred Silo settings
|
|
_PREF_GROUP = "User parameter:BaseApp/Preferences/Mod/KindredSilo"
|
|
|
|
# Configuration - preferences take priority over env vars
|
|
SILO_PROJECTS_DIR = os.environ.get("SILO_PROJECTS_DIR", os.path.expanduser("~/projects"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FreeCAD settings adapter
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class FreeCADSiloSettings(SiloSettings):
|
|
"""SiloSettings backed by FreeCAD preferences."""
|
|
|
|
def get_api_url(self) -> str:
|
|
param = FreeCAD.ParamGet(_PREF_GROUP)
|
|
url = param.GetString("ApiUrl", "")
|
|
if not url:
|
|
url = os.environ.get("SILO_API_URL", "http://localhost:8080/api")
|
|
url = url.rstrip("/")
|
|
if url:
|
|
parsed = urllib.parse.urlparse(url)
|
|
if not parsed.path or parsed.path == "/":
|
|
url = url + "/api"
|
|
return url
|
|
|
|
def get_api_token(self) -> str:
|
|
param = FreeCAD.ParamGet(_PREF_GROUP)
|
|
token = param.GetString("ApiToken", "")
|
|
if not token:
|
|
token = os.environ.get("SILO_API_TOKEN", "")
|
|
return token
|
|
|
|
def get_ssl_verify(self) -> bool:
|
|
param = FreeCAD.ParamGet(_PREF_GROUP)
|
|
return param.GetBool("SslVerify", True)
|
|
|
|
def get_ssl_cert_path(self) -> str:
|
|
param = FreeCAD.ParamGet(_PREF_GROUP)
|
|
return param.GetString("SslCertPath", "")
|
|
|
|
def save_auth(self, username: str, role: str = "", source: str = "", token: str = ""):
|
|
param = FreeCAD.ParamGet(_PREF_GROUP)
|
|
param.SetString("AuthUsername", username)
|
|
param.SetString("AuthRole", role)
|
|
param.SetString("AuthSource", source)
|
|
if token:
|
|
param.SetString("ApiToken", token)
|
|
|
|
def clear_auth(self):
|
|
param = FreeCAD.ParamGet(_PREF_GROUP)
|
|
param.SetString("ApiToken", "")
|
|
param.SetString("AuthUsername", "")
|
|
param.SetString("AuthRole", "")
|
|
param.SetString("AuthSource", "")
|
|
|
|
|
|
_fc_settings = FreeCADSiloSettings()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth info helpers (read from preferences for UI display)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _get_auth_username() -> str:
|
|
param = FreeCAD.ParamGet(_PREF_GROUP)
|
|
return param.GetString("AuthUsername", "")
|
|
|
|
|
|
def _get_auth_role() -> str:
|
|
param = FreeCAD.ParamGet(_PREF_GROUP)
|
|
return param.GetString("AuthRole", "")
|
|
|
|
|
|
def _get_auth_source() -> str:
|
|
param = FreeCAD.ParamGet(_PREF_GROUP)
|
|
return param.GetString("AuthSource", "")
|
|
|
|
|
|
def _get_auth_token() -> str:
|
|
return _fc_settings.get_api_token()
|
|
|
|
|
|
# Thin wrappers so command classes can call these without refactoring.
|
|
# They delegate to the settings adapter or the shared client.
|
|
|
|
|
|
def _get_api_url() -> str:
|
|
return _fc_settings.get_api_url()
|
|
|
|
|
|
def _get_ssl_verify() -> bool:
|
|
return _fc_settings.get_ssl_verify()
|
|
|
|
|
|
def _get_ssl_context():
|
|
from silo_client._ssl import build_ssl_context
|
|
|
|
return build_ssl_context(_fc_settings.get_ssl_verify(), _fc_settings.get_ssl_cert_path())
|
|
|
|
|
|
def _get_auth_headers() -> Dict[str, str]:
|
|
token = _fc_settings.get_api_token()
|
|
if not token:
|
|
token = os.environ.get("SILO_API_TOKEN", "")
|
|
if token:
|
|
return {"Authorization": f"Bearer {token}"}
|
|
return {}
|
|
|
|
|
|
def _save_auth_info(username: str, role: str = "", source: str = "", token: str = ""):
|
|
_fc_settings.save_auth(username, role, source, token)
|
|
|
|
|
|
def _clear_auth():
|
|
_fc_settings.clear_auth()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Server mode tracking
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_server_mode = "offline" # "normal" | "read-only" | "degraded" | "offline"
|
|
|
|
|
|
def _fetch_server_mode() -> str:
|
|
"""Fetch server mode from the /ready endpoint.
|
|
|
|
Returns one of: "normal", "read-only", "degraded", "offline".
|
|
"""
|
|
api_url = _get_api_url().rstrip("/")
|
|
base_url = api_url[:-4] if api_url.endswith("/api") else api_url
|
|
url = f"{base_url}/ready"
|
|
try:
|
|
req = urllib.request.Request(url, method="GET")
|
|
resp = urllib.request.urlopen(req, context=_get_ssl_context(), timeout=10)
|
|
body = resp.read(4096).decode("utf-8", errors="replace")
|
|
data = json.loads(body)
|
|
status = data.get("status", "")
|
|
if status in ("ok", "ready"):
|
|
return "normal"
|
|
if status in ("read-only", "read_only", "readonly"):
|
|
return "read-only"
|
|
if status in ("degraded",):
|
|
return "degraded"
|
|
# Unknown status but server responded — treat as normal
|
|
return "normal"
|
|
except Exception:
|
|
return "offline"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Icon helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ICON_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resources", "icons")
|
|
|
|
|
|
def _icon(name):
|
|
"""Get icon path by name."""
|
|
if _ICON_DIR:
|
|
path = os.path.join(_ICON_DIR, f"silo-{name}.svg")
|
|
if os.path.exists(path):
|
|
return path
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FreeCAD-specific path utilities
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def get_projects_dir() -> Path:
|
|
"""Get the projects directory."""
|
|
projects_dir = Path(SILO_PROJECTS_DIR)
|
|
projects_dir.mkdir(parents=True, exist_ok=True)
|
|
return projects_dir
|
|
|
|
|
|
def get_cad_file_path(part_number: str, description: str = "") -> Path:
|
|
"""Generate canonical file path for a CAD file.
|
|
|
|
Path format: ~/projects/cad/{category_code}_{category_name}/{part_number}_{description}.FCStd
|
|
"""
|
|
category, _ = parse_part_number(part_number)
|
|
folder_name = get_category_folder_name(category)
|
|
|
|
if description:
|
|
filename = f"{part_number}_{sanitize_filename(description)}.FCStd"
|
|
else:
|
|
filename = f"{part_number}.FCStd"
|
|
|
|
return get_projects_dir() / "cad" / folder_name / filename
|
|
|
|
|
|
def find_file_by_part_number(part_number: str) -> Optional[Path]:
|
|
"""Find existing CAD file for a part number."""
|
|
category, _ = parse_part_number(part_number)
|
|
folder_name = get_category_folder_name(category)
|
|
cad_dir = get_projects_dir() / "cad" / folder_name
|
|
|
|
if cad_dir.exists():
|
|
matches = list(cad_dir.glob(f"{part_number}*.FCStd"))
|
|
if matches:
|
|
return matches[0]
|
|
|
|
base_cad_dir = get_projects_dir() / "cad"
|
|
if base_cad_dir.exists():
|
|
for subdir in base_cad_dir.iterdir():
|
|
if subdir.is_dir():
|
|
matches = list(subdir.glob(f"{part_number}*.FCStd"))
|
|
if matches:
|
|
return matches[0]
|
|
|
|
return None
|
|
|
|
|
|
def search_local_files(search_term: str = "", category_filter: str = "") -> list:
|
|
"""Search for CAD files in local cad directory."""
|
|
results = []
|
|
cad_dir = get_projects_dir() / "cad"
|
|
if not cad_dir.exists():
|
|
return results
|
|
|
|
search_lower = search_term.lower()
|
|
|
|
for category_dir in cad_dir.iterdir():
|
|
if not category_dir.is_dir():
|
|
continue
|
|
|
|
folder_name = category_dir.name
|
|
category_code = folder_name.split("_")[0] if "_" in folder_name else folder_name
|
|
|
|
if category_filter and category_code.upper() != category_filter.upper():
|
|
continue
|
|
|
|
for fcstd_file in category_dir.glob("*.FCStd"):
|
|
filename = fcstd_file.stem
|
|
parts = filename.split("_", 1)
|
|
part_number = parts[0]
|
|
description = parts[1].replace("_", " ") if len(parts) > 1 else ""
|
|
|
|
if search_term:
|
|
searchable = f"{part_number} {description}".lower()
|
|
if search_lower not in searchable:
|
|
continue
|
|
|
|
try:
|
|
from datetime import datetime
|
|
|
|
mtime = fcstd_file.stat().st_mtime
|
|
modified = datetime.fromtimestamp(mtime).isoformat()
|
|
except Exception:
|
|
modified = None
|
|
|
|
results.append(
|
|
{
|
|
"path": str(fcstd_file),
|
|
"part_number": part_number,
|
|
"description": description,
|
|
"category": category_code,
|
|
"modified": modified,
|
|
"source": "local",
|
|
}
|
|
)
|
|
|
|
results.sort(key=lambda x: x.get("modified") or "", reverse=True)
|
|
return results
|
|
|
|
|
|
def _safe_float(val):
|
|
"""Convert float to JSON-safe value, handling NaN and Infinity."""
|
|
import math
|
|
|
|
if isinstance(val, float):
|
|
if math.isnan(val) or math.isinf(val):
|
|
return 0.0
|
|
return val
|
|
|
|
|
|
def collect_document_properties(doc) -> Dict[str, Any]:
|
|
"""Collect properties from all objects in a document."""
|
|
result = {
|
|
"_document_name": doc.Name,
|
|
"_file_name": doc.FileName or None,
|
|
"objects": {},
|
|
}
|
|
|
|
for obj in doc.Objects:
|
|
if obj.TypeId.startswith("App::") and obj.TypeId not in ("App::Part",):
|
|
continue
|
|
|
|
props = {"_object_type": obj.TypeId, "_label": obj.Label}
|
|
|
|
if hasattr(obj, "Placement"):
|
|
p = obj.Placement
|
|
props["placement"] = {
|
|
"position": {
|
|
"x": _safe_float(p.Base.x),
|
|
"y": _safe_float(p.Base.y),
|
|
"z": _safe_float(p.Base.z),
|
|
},
|
|
"rotation": {
|
|
"axis": {
|
|
"x": _safe_float(p.Rotation.Axis.x),
|
|
"y": _safe_float(p.Rotation.Axis.y),
|
|
"z": _safe_float(p.Rotation.Axis.z),
|
|
},
|
|
"angle": _safe_float(p.Rotation.Angle),
|
|
},
|
|
}
|
|
|
|
if hasattr(obj, "Shape") and obj.Shape:
|
|
try:
|
|
bbox = obj.Shape.BoundBox
|
|
props["bounding_box"] = {
|
|
"x_length": _safe_float(bbox.XLength),
|
|
"y_length": _safe_float(bbox.YLength),
|
|
"z_length": _safe_float(bbox.ZLength),
|
|
}
|
|
if hasattr(obj.Shape, "Volume"):
|
|
props["volume"] = _safe_float(obj.Shape.Volume)
|
|
except Exception:
|
|
pass
|
|
|
|
result["objects"][obj.Label] = props
|
|
|
|
return result
|
|
|
|
|
|
def set_silo_properties(obj, props: Dict[str, Any]):
|
|
"""Set Silo properties on FreeCAD object."""
|
|
for name, value in props.items():
|
|
if not hasattr(obj, name):
|
|
if isinstance(value, str):
|
|
obj.addProperty("App::PropertyString", name, "Silo", "")
|
|
elif isinstance(value, int):
|
|
obj.addProperty("App::PropertyInteger", name, "Silo", "")
|
|
setattr(obj, name, value)
|
|
|
|
|
|
def get_tracked_object(doc):
|
|
"""Find the primary tracked object in a document."""
|
|
for obj in doc.Objects:
|
|
if hasattr(obj, "SiloPartNumber") and obj.SiloPartNumber:
|
|
return obj
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Client and sync instances
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_client = SiloClient(_fc_settings)
|
|
|
|
|
|
class SiloSync:
|
|
"""Handles synchronization between FreeCAD and Silo."""
|
|
|
|
def __init__(self, client: SiloClient = None):
|
|
self.client = client or _client
|
|
|
|
def save_to_canonical_path(self, doc, force_rename: bool = False) -> Optional[Path]:
|
|
"""Save document to canonical path."""
|
|
obj = get_tracked_object(doc)
|
|
if not obj:
|
|
return None
|
|
|
|
part_number = obj.SiloPartNumber
|
|
|
|
try:
|
|
item = self.client.get_item(part_number)
|
|
description = item.get("description", "")
|
|
new_path = get_cad_file_path(part_number, description)
|
|
new_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
existing_path = find_file_by_part_number(part_number)
|
|
current_path = Path(doc.FileName) if doc.FileName else None
|
|
|
|
# Use save() if already at the correct path, saveAs() only if path changes
|
|
if current_path and current_path == new_path:
|
|
doc.save()
|
|
elif (
|
|
existing_path
|
|
and existing_path != new_path
|
|
and (force_rename or current_path == existing_path)
|
|
):
|
|
doc.saveAs(str(new_path))
|
|
try:
|
|
existing_path.unlink()
|
|
except OSError:
|
|
pass
|
|
else:
|
|
doc.saveAs(str(new_path))
|
|
|
|
return new_path
|
|
except Exception as e:
|
|
FreeCAD.Console.PrintError(f"Save failed: {e}\n")
|
|
return None
|
|
|
|
def create_document_for_item(self, item: Dict[str, Any], save: bool = True):
|
|
"""Create a new FreeCAD document for a database item."""
|
|
part_number = item.get("part_number", "")
|
|
description = item.get("description", "")
|
|
item_type = item.get("item_type", "part")
|
|
|
|
if not part_number:
|
|
return None
|
|
|
|
doc = FreeCAD.newDocument(part_number)
|
|
safe_name = "_" + part_number
|
|
|
|
if item_type == "assembly":
|
|
# Create an Assembly object for assembly items (FreeCAD 1.0+)
|
|
try:
|
|
assembly_obj = doc.addObject("Assembly::AssemblyObject", safe_name)
|
|
assembly_obj.Label = part_number
|
|
set_silo_properties(
|
|
assembly_obj,
|
|
{
|
|
"SiloItemId": item.get("id", ""),
|
|
"SiloPartNumber": part_number,
|
|
"SiloRevision": item.get("current_revision", 1),
|
|
"SiloItemType": item_type,
|
|
},
|
|
)
|
|
except Exception as e:
|
|
# Fallback to App::Part if Assembly workbench not available
|
|
FreeCAD.Console.PrintWarning(
|
|
f"Assembly workbench not available, using App::Part: {e}\n"
|
|
)
|
|
part_obj = doc.addObject("App::Part", safe_name)
|
|
part_obj.Label = part_number
|
|
set_silo_properties(
|
|
part_obj,
|
|
{
|
|
"SiloItemId": item.get("id", ""),
|
|
"SiloPartNumber": part_number,
|
|
"SiloRevision": item.get("current_revision", 1),
|
|
"SiloItemType": item_type,
|
|
},
|
|
)
|
|
else:
|
|
# Create a Part container for non-assembly items
|
|
part_obj = doc.addObject("App::Part", safe_name)
|
|
part_obj.Label = part_number
|
|
|
|
set_silo_properties(
|
|
part_obj,
|
|
{
|
|
"SiloItemId": item.get("id", ""),
|
|
"SiloPartNumber": part_number,
|
|
"SiloRevision": item.get("current_revision", 1),
|
|
"SiloItemType": item_type,
|
|
},
|
|
)
|
|
|
|
# Add a Body for parts (not assemblies)
|
|
body_label = sanitize_filename(description) if description else "Body"
|
|
body = doc.addObject("PartDesign::Body", "_" + body_label)
|
|
body.Label = body_label
|
|
part_obj.addObject(body)
|
|
|
|
doc.recompute()
|
|
|
|
if save:
|
|
file_path = get_cad_file_path(part_number, description)
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
doc.saveAs(str(file_path))
|
|
|
|
return doc
|
|
|
|
def open_item(self, part_number: str):
|
|
"""Open or create item document."""
|
|
existing_path = find_file_by_part_number(part_number)
|
|
|
|
if existing_path and existing_path.exists():
|
|
return FreeCAD.openDocument(str(existing_path))
|
|
|
|
try:
|
|
item = self.client.get_item(part_number)
|
|
return self.create_document_for_item(item, save=True)
|
|
except Exception as e:
|
|
FreeCAD.Console.PrintError(f"Failed to open: {e}\n")
|
|
return None
|
|
|
|
def upload_file(
|
|
self, part_number: str, file_path: str, comment: str = "Auto-save"
|
|
) -> Optional[Dict]:
|
|
"""Upload file to MinIO."""
|
|
try:
|
|
doc = FreeCAD.openDocument(file_path)
|
|
if not doc:
|
|
return None
|
|
properties = collect_document_properties(doc)
|
|
FreeCAD.closeDocument(doc.Name)
|
|
|
|
return self.client._upload_file(part_number, file_path, properties, comment)
|
|
except Exception as e:
|
|
FreeCAD.Console.PrintError(f"Upload failed: {e}\n")
|
|
return None
|
|
|
|
def download_file(self, part_number: str) -> Optional[Path]:
|
|
"""Download latest file from MinIO."""
|
|
try:
|
|
item = self.client.get_item(part_number)
|
|
file_path = get_cad_file_path(part_number, item.get("description", ""))
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
revisions = self.client.get_revisions(part_number)
|
|
for rev in revisions:
|
|
if rev.get("file_key"):
|
|
if self.client._download_file(
|
|
part_number, rev["revision_number"], str(file_path)
|
|
):
|
|
return file_path
|
|
return None
|
|
except Exception as e:
|
|
FreeCAD.Console.PrintError(f"Download failed: {e}\n")
|
|
return None
|
|
|
|
|
|
_sync = SiloSync()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# kindred:// URL handler
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def handle_kindred_url(url: str):
|
|
"""Handle a ``kindred://`` URL by opening the referenced item.
|
|
|
|
URL format::
|
|
|
|
kindred://item/{part_number}
|
|
kindred://item/{part_number}/revision/{rev_number}
|
|
|
|
Called from C++ ``MainWindow::processMessages()`` when a ``kindred://``
|
|
URL arrives via IPC, or from ``InitGui.py`` for cold-start URL arguments.
|
|
"""
|
|
from urllib.parse import urlparse
|
|
|
|
parsed = urlparse(url)
|
|
if parsed.scheme != "kindred":
|
|
return
|
|
# urlparse treats "kindred://item/PN-001" as netloc="item", path="/PN-001"
|
|
parts = [parsed.netloc] + [p for p in parsed.path.split("/") if p]
|
|
if len(parts) >= 2 and parts[0] == "item":
|
|
part_number = parts[1]
|
|
FreeCAD.Console.PrintMessage(f"Silo: Opening item {part_number} from kindred:// URL\n")
|
|
_sync.open_item(part_number)
|
|
|
|
|
|
# ============================================================================
|
|
# COMMANDS
|
|
# ============================================================================
|
|
|
|
|
|
class Silo_Open:
|
|
"""Open item - combined search and open dialog."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "Open",
|
|
"ToolTip": "Search and open items (Ctrl+O)",
|
|
"Pixmap": _icon("open"),
|
|
}
|
|
|
|
def Activated(self):
|
|
from PySide import QtCore, QtGui
|
|
|
|
dialog = QtGui.QDialog()
|
|
dialog.setWindowTitle("Silo - Open Item")
|
|
dialog.setMinimumWidth(700)
|
|
dialog.setMinimumHeight(500)
|
|
|
|
layout = QtGui.QVBoxLayout(dialog)
|
|
|
|
# Search row
|
|
search_layout = QtGui.QHBoxLayout()
|
|
search_input = QtGui.QLineEdit()
|
|
search_input.setPlaceholderText("Search by part number or description...")
|
|
search_layout.addWidget(search_input)
|
|
layout.addLayout(search_layout)
|
|
|
|
# Filters
|
|
filter_layout = QtGui.QHBoxLayout()
|
|
db_checkbox = QtGui.QCheckBox("Database")
|
|
db_checkbox.setChecked(True)
|
|
local_checkbox = QtGui.QCheckBox("Local Files")
|
|
local_checkbox.setChecked(True)
|
|
filter_layout.addWidget(db_checkbox)
|
|
filter_layout.addWidget(local_checkbox)
|
|
filter_layout.addStretch()
|
|
layout.addLayout(filter_layout)
|
|
|
|
# Results table
|
|
results_table = QtGui.QTableWidget()
|
|
results_table.setColumnCount(5)
|
|
results_table.setHorizontalHeaderLabels(
|
|
["Part Number", "Description", "Type", "Source", "Modified"]
|
|
)
|
|
results_table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows)
|
|
results_table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection)
|
|
results_table.horizontalHeader().setStretchLastSection(True)
|
|
results_table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers)
|
|
layout.addWidget(results_table)
|
|
|
|
results_data = []
|
|
|
|
def do_search():
|
|
nonlocal results_data
|
|
search_term = search_input.text().strip()
|
|
results_data = []
|
|
results_table.setRowCount(0)
|
|
|
|
if db_checkbox.isChecked():
|
|
try:
|
|
for item in _client.list_items(search=search_term):
|
|
results_data.append(
|
|
{
|
|
"part_number": item.get("part_number", ""),
|
|
"description": item.get("description", ""),
|
|
"item_type": item.get("item_type", ""),
|
|
"source": "database",
|
|
"modified": item.get("updated_at", "")[:10]
|
|
if item.get("updated_at")
|
|
else "",
|
|
"path": None,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
FreeCAD.Console.PrintWarning(f"DB search failed: {e}\n")
|
|
|
|
if local_checkbox.isChecked():
|
|
try:
|
|
for item in search_local_files(search_term):
|
|
existing = next(
|
|
(r for r in results_data if r["part_number"] == item["part_number"]),
|
|
None,
|
|
)
|
|
if existing:
|
|
existing["source"] = "both"
|
|
existing["path"] = item.get("path")
|
|
else:
|
|
results_data.append(
|
|
{
|
|
"part_number": item.get("part_number", ""),
|
|
"description": item.get("description", ""),
|
|
"item_type": "",
|
|
"source": "local",
|
|
"modified": item.get("modified", "")[:10]
|
|
if item.get("modified")
|
|
else "",
|
|
"path": item.get("path"),
|
|
}
|
|
)
|
|
except Exception as e:
|
|
FreeCAD.Console.PrintWarning(f"Local search failed: {e}\n")
|
|
|
|
results_table.setRowCount(len(results_data))
|
|
for row, data in enumerate(results_data):
|
|
results_table.setItem(row, 0, QtGui.QTableWidgetItem(data["part_number"]))
|
|
results_table.setItem(row, 1, QtGui.QTableWidgetItem(data["description"]))
|
|
results_table.setItem(row, 2, QtGui.QTableWidgetItem(data["item_type"]))
|
|
results_table.setItem(row, 3, QtGui.QTableWidgetItem(data["source"]))
|
|
results_table.setItem(row, 4, QtGui.QTableWidgetItem(data["modified"]))
|
|
results_table.resizeColumnsToContents()
|
|
|
|
_open_after_close = [None]
|
|
|
|
def open_selected():
|
|
selected = results_table.selectedItems()
|
|
if not selected:
|
|
return
|
|
row = selected[0].row()
|
|
_open_after_close[0] = dict(results_data[row])
|
|
dialog.accept()
|
|
|
|
search_input.textChanged.connect(lambda: do_search())
|
|
results_table.doubleClicked.connect(open_selected)
|
|
|
|
# Buttons
|
|
btn_layout = QtGui.QHBoxLayout()
|
|
open_btn = QtGui.QPushButton("Open")
|
|
open_btn.clicked.connect(open_selected)
|
|
cancel_btn = QtGui.QPushButton("Cancel")
|
|
cancel_btn.clicked.connect(dialog.reject)
|
|
btn_layout.addStretch()
|
|
btn_layout.addWidget(open_btn)
|
|
btn_layout.addWidget(cancel_btn)
|
|
layout.addLayout(btn_layout)
|
|
|
|
do_search()
|
|
dialog.exec_()
|
|
|
|
# Open the document AFTER the dialog has fully closed so that
|
|
# heavy document loads (especially Assembly files) don't run
|
|
# inside the dialog's nested event loop, which can cause crashes.
|
|
data = _open_after_close[0]
|
|
if data is not None:
|
|
if data.get("path"):
|
|
FreeCAD.openDocument(data["path"])
|
|
else:
|
|
_sync.open_item(data["part_number"])
|
|
|
|
def IsActive(self):
|
|
return True
|
|
|
|
|
|
class Silo_New:
|
|
"""Create new item with part number."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "New",
|
|
"ToolTip": "Create new item (Ctrl+N)",
|
|
"Pixmap": _icon("new"),
|
|
}
|
|
|
|
def Activated(self):
|
|
from PySide import QtGui
|
|
|
|
from schema_form import SchemaFormDialog
|
|
|
|
sel = FreeCADGui.Selection.getSelection()
|
|
|
|
dlg = SchemaFormDialog(_client, parent=FreeCADGui.getMainWindow())
|
|
|
|
# Pre-fill description from selected object
|
|
if sel:
|
|
dlg._desc_edit.setText(sel[0].Label)
|
|
|
|
result = dlg.exec_and_create()
|
|
if result is None:
|
|
return
|
|
|
|
part_number = result["part_number"]
|
|
form_data = result.get("_form_data", {})
|
|
selected_projects = form_data.get("projects") or []
|
|
|
|
try:
|
|
if sel:
|
|
# Tag selected object
|
|
obj = sel[0]
|
|
set_silo_properties(
|
|
obj,
|
|
{
|
|
"SiloItemId": result.get("id", ""),
|
|
"SiloPartNumber": part_number,
|
|
"SiloRevision": 1,
|
|
},
|
|
)
|
|
obj.Label = part_number
|
|
_sync.save_to_canonical_path(FreeCAD.ActiveDocument, force_rename=True)
|
|
else:
|
|
# Create new document
|
|
_sync.create_document_for_item(result, save=True)
|
|
|
|
msg = f"Part number: {part_number}"
|
|
if selected_projects:
|
|
msg += f"\nTagged with projects: {', '.join(selected_projects)}"
|
|
|
|
FreeCAD.Console.PrintMessage(f"Created: {part_number}\n")
|
|
QtGui.QMessageBox.information(None, "Item Created", msg)
|
|
|
|
except Exception as e:
|
|
QtGui.QMessageBox.critical(None, "Error", str(e))
|
|
|
|
def IsActive(self):
|
|
return _server_mode == "normal"
|
|
|
|
|
|
class Silo_Save:
|
|
"""Save locally and upload to MinIO."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "Save",
|
|
"ToolTip": "Save locally and upload to MinIO (Ctrl+S)",
|
|
"Pixmap": _icon("save"),
|
|
}
|
|
|
|
def Activated(self):
|
|
doc = FreeCAD.ActiveDocument
|
|
if not doc:
|
|
FreeCAD.Console.PrintError("No active document\n")
|
|
return
|
|
|
|
obj = get_tracked_object(doc)
|
|
|
|
# If not tracked, just do a regular FreeCAD save
|
|
if not obj:
|
|
if doc.FileName:
|
|
doc.save()
|
|
FreeCAD.Console.PrintMessage(f"Saved: {doc.FileName}\n")
|
|
else:
|
|
FreeCADGui.runCommand("Std_SaveAs", 0)
|
|
return
|
|
|
|
part_number = obj.SiloPartNumber
|
|
|
|
# Check if document has unsaved changes
|
|
gui_doc = FreeCADGui.getDocument(doc.Name)
|
|
is_modified = gui_doc.Modified if gui_doc else True
|
|
|
|
if gui_doc and not is_modified and doc.FileName:
|
|
FreeCAD.Console.PrintMessage("No changes to save.\n")
|
|
return
|
|
|
|
# Collect properties BEFORE saving to avoid dirtying the document
|
|
# (accessing Shape properties can trigger recompute)
|
|
properties = collect_document_properties(doc)
|
|
|
|
# Save locally
|
|
file_path = _sync.save_to_canonical_path(doc, force_rename=True)
|
|
if not file_path:
|
|
# Fallback to regular save if canonical path fails
|
|
if doc.FileName:
|
|
doc.save()
|
|
file_path = Path(doc.FileName)
|
|
else:
|
|
FreeCAD.Console.PrintError("Could not determine save path\n")
|
|
return
|
|
|
|
# Force clear modified flag if save succeeded (needed for assemblies)
|
|
if gui_doc and gui_doc.Modified:
|
|
try:
|
|
gui_doc.Modified = False
|
|
except Exception:
|
|
pass
|
|
|
|
FreeCAD.Console.PrintMessage(f"Saved: {file_path}\n")
|
|
|
|
# Try to upload to MinIO
|
|
try:
|
|
result = _client._upload_file(part_number, str(file_path), properties, "Auto-save")
|
|
|
|
new_rev = result["revision_number"]
|
|
FreeCAD.Console.PrintMessage(f"Uploaded as revision {new_rev}\n")
|
|
|
|
except Exception as e:
|
|
FreeCAD.Console.PrintWarning(f"Upload failed: {e}\n")
|
|
FreeCAD.Console.PrintMessage("File saved locally but not uploaded.\n")
|
|
|
|
def IsActive(self):
|
|
return FreeCAD.ActiveDocument is not None and _server_mode == "normal"
|
|
|
|
|
|
class Silo_Commit:
|
|
"""Save as new revision with comment."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "Commit",
|
|
"ToolTip": "Save as new revision with comment (Ctrl+Shift+S)",
|
|
"Pixmap": _icon("commit"),
|
|
}
|
|
|
|
def Activated(self):
|
|
from PySide import QtGui
|
|
|
|
doc = FreeCAD.ActiveDocument
|
|
if not doc:
|
|
FreeCAD.Console.PrintError("No active document\n")
|
|
return
|
|
|
|
obj = get_tracked_object(doc)
|
|
if not obj:
|
|
FreeCAD.Console.PrintError("No tracked object. Use 'New' to register first.\n")
|
|
return
|
|
|
|
part_number = obj.SiloPartNumber
|
|
|
|
comment, ok = QtGui.QInputDialog.getText(None, "Commit", "Revision comment:")
|
|
if not ok:
|
|
return
|
|
|
|
# Collect properties BEFORE saving to avoid dirtying the document
|
|
properties = collect_document_properties(doc)
|
|
|
|
try:
|
|
file_path = _sync.save_to_canonical_path(doc, force_rename=True)
|
|
if not file_path:
|
|
return
|
|
|
|
result = _client._upload_file(part_number, str(file_path), properties, comment)
|
|
|
|
new_rev = result["revision_number"]
|
|
FreeCAD.Console.PrintMessage(f"Committed revision {new_rev}: {comment}\n")
|
|
|
|
except Exception as e:
|
|
FreeCAD.Console.PrintError(f"Commit failed: {e}\n")
|
|
|
|
def IsActive(self):
|
|
return FreeCAD.ActiveDocument is not None and _server_mode == "normal"
|
|
|
|
|
|
def _check_pull_conflicts(part_number, local_path, doc=None):
|
|
"""Check for conflicts between local file and server state.
|
|
|
|
Returns a list of conflict description strings, or an empty list if clean.
|
|
"""
|
|
conflicts = []
|
|
|
|
# Check for unsaved changes in an open document
|
|
if doc is not None:
|
|
gui_doc = FreeCADGui.getDocument(doc.Name) if doc.Name else None
|
|
if gui_doc and gui_doc.Modified:
|
|
conflicts.append("Document has unsaved local changes.")
|
|
|
|
# Check local revision vs server latest
|
|
if doc is not None:
|
|
obj = get_tracked_object(doc)
|
|
if obj and hasattr(obj, "SiloRevision"):
|
|
local_rev = getattr(obj, "SiloRevision", 0)
|
|
latest = _client.latest_file_revision(part_number)
|
|
if latest and local_rev and latest["revision_number"] > local_rev:
|
|
conflicts.append(
|
|
f"Local file is at revision {local_rev}, "
|
|
f"server has revision {latest['revision_number']}."
|
|
)
|
|
|
|
# Check local file mtime vs server timestamp
|
|
if local_path and local_path.exists():
|
|
import datetime
|
|
|
|
local_mtime = datetime.datetime.fromtimestamp(
|
|
local_path.stat().st_mtime, tz=datetime.timezone.utc
|
|
)
|
|
try:
|
|
item = _client.get_item(part_number)
|
|
server_updated = item.get("updated_at", "")
|
|
if server_updated:
|
|
# Parse ISO format timestamp
|
|
server_dt = datetime.datetime.fromisoformat(server_updated.replace("Z", "+00:00"))
|
|
if server_dt > local_mtime:
|
|
conflicts.append("Server version is newer than local file.")
|
|
except Exception:
|
|
pass
|
|
|
|
return conflicts
|
|
|
|
|
|
class SiloPullDialog:
|
|
"""Dialog for selecting which revision to pull."""
|
|
|
|
def __init__(self, part_number, revisions, parent=None):
|
|
from PySide import QtCore, QtGui
|
|
|
|
self._selected_revision = None
|
|
|
|
self._dialog = QtGui.QDialog(parent)
|
|
self._dialog.setWindowTitle(f"Pull - {part_number}")
|
|
self._dialog.setMinimumWidth(600)
|
|
self._dialog.setMinimumHeight(350)
|
|
|
|
layout = QtGui.QVBoxLayout(self._dialog)
|
|
|
|
info = QtGui.QLabel(f"Select a revision to download for {part_number}:")
|
|
layout.addWidget(info)
|
|
|
|
# Revision table
|
|
self._table = QtGui.QTableWidget()
|
|
self._table.setColumnCount(5)
|
|
self._table.setHorizontalHeaderLabels(["Rev", "Date", "Comment", "Status", "File"])
|
|
self._table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows)
|
|
self._table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection)
|
|
self._table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers)
|
|
self._table.verticalHeader().setVisible(False)
|
|
header = self._table.horizontalHeader()
|
|
header.setStretchLastSection(True)
|
|
|
|
# Populate rows
|
|
file_revisions = []
|
|
self._table.setRowCount(len(revisions))
|
|
for i, rev in enumerate(revisions):
|
|
rev_num = rev.get("revision_number", "")
|
|
date = rev.get("created_at", "")[:16].replace("T", " ")
|
|
comment = rev.get("comment", "")
|
|
status = rev.get("status", "")
|
|
has_file = "\u2713" if rev.get("file_key") else ""
|
|
|
|
self._table.setItem(i, 0, QtGui.QTableWidgetItem(str(rev_num)))
|
|
self._table.setItem(i, 1, QtGui.QTableWidgetItem(date))
|
|
self._table.setItem(i, 2, QtGui.QTableWidgetItem(comment))
|
|
self._table.setItem(i, 3, QtGui.QTableWidgetItem(status))
|
|
file_item = QtGui.QTableWidgetItem(has_file)
|
|
file_item.setTextAlignment(QtCore.Qt.AlignCenter)
|
|
self._table.setItem(i, 4, file_item)
|
|
|
|
if rev.get("file_key"):
|
|
file_revisions.append(i)
|
|
|
|
self._table.resizeColumnsToContents()
|
|
layout.addWidget(self._table)
|
|
|
|
# Pre-select the latest revision with a file
|
|
if file_revisions:
|
|
self._table.selectRow(file_revisions[0])
|
|
|
|
# Store revision data for lookup
|
|
self._revisions = revisions
|
|
|
|
# Buttons
|
|
btn_layout = QtGui.QHBoxLayout()
|
|
btn_layout.addStretch()
|
|
download_btn = QtGui.QPushButton("Download")
|
|
cancel_btn = QtGui.QPushButton("Cancel")
|
|
download_btn.clicked.connect(self._on_download)
|
|
cancel_btn.clicked.connect(self._dialog.reject)
|
|
self._table.doubleClicked.connect(self._on_download)
|
|
btn_layout.addWidget(download_btn)
|
|
btn_layout.addWidget(cancel_btn)
|
|
layout.addLayout(btn_layout)
|
|
|
|
def _on_download(self):
|
|
row = self._table.currentRow()
|
|
if row < 0:
|
|
return
|
|
rev = self._revisions[row]
|
|
if not rev.get("file_key"):
|
|
from PySide import QtGui
|
|
|
|
QtGui.QMessageBox.information(
|
|
self._dialog, "Pull", "Selected revision has no file attached."
|
|
)
|
|
return
|
|
self._selected_revision = rev
|
|
self._dialog.accept()
|
|
|
|
def exec_(self):
|
|
if self._dialog.exec_() == 1: # QDialog.Accepted
|
|
return self._selected_revision
|
|
return None
|
|
|
|
|
|
class Silo_Pull:
|
|
"""Download from MinIO / sync from database."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "Pull",
|
|
"ToolTip": "Download from MinIO with revision selection",
|
|
"Pixmap": _icon("pull"),
|
|
}
|
|
|
|
def Activated(self):
|
|
from PySide import QtGui
|
|
|
|
doc = FreeCAD.ActiveDocument
|
|
part_number = None
|
|
obj = None
|
|
|
|
if doc:
|
|
obj = get_tracked_object(doc)
|
|
if obj:
|
|
part_number = obj.SiloPartNumber
|
|
|
|
if not part_number:
|
|
part_number, ok = QtGui.QInputDialog.getText(None, "Pull", "Part number:")
|
|
if not ok or not part_number:
|
|
return
|
|
part_number = part_number.strip().upper()
|
|
|
|
# Fetch revisions from server
|
|
try:
|
|
revisions = _client.get_revisions(part_number)
|
|
except Exception as e:
|
|
QtGui.QMessageBox.warning(None, "Pull", f"Cannot reach server: {e}")
|
|
return
|
|
|
|
existing_local = find_file_by_part_number(part_number)
|
|
|
|
# If no revisions have files, fall back to create-from-database
|
|
has_any_file = any(r.get("file_key") for r in revisions)
|
|
|
|
if not has_any_file:
|
|
if existing_local:
|
|
FreeCAD.Console.PrintMessage(f"Opening existing local file: {existing_local}\n")
|
|
FreeCAD.openDocument(str(existing_local))
|
|
else:
|
|
try:
|
|
item = _client.get_item(part_number)
|
|
new_doc = _sync.create_document_for_item(item, save=True)
|
|
if new_doc:
|
|
FreeCAD.Console.PrintMessage(f"Created local file for {part_number}\n")
|
|
else:
|
|
QtGui.QMessageBox.warning(
|
|
None,
|
|
"Pull",
|
|
f"Failed to create document for {part_number}",
|
|
)
|
|
except Exception as e:
|
|
QtGui.QMessageBox.warning(None, "Pull", f"Failed: {e}")
|
|
return
|
|
|
|
# Conflict detection
|
|
conflicts = _check_pull_conflicts(part_number, existing_local, doc)
|
|
if conflicts:
|
|
detail = "\n".join(f" - {c}" for c in conflicts)
|
|
reply = QtGui.QMessageBox.warning(
|
|
None,
|
|
"Pull - Conflicts Detected",
|
|
f"Potential conflicts found:\n{detail}\n\n"
|
|
"Download anyway and overwrite local file?",
|
|
QtGui.QMessageBox.Yes | QtGui.QMessageBox.No,
|
|
)
|
|
if reply != QtGui.QMessageBox.Yes:
|
|
return
|
|
|
|
# Show revision selection dialog
|
|
dlg = SiloPullDialog(part_number, revisions)
|
|
selected = dlg.exec_()
|
|
if selected is None:
|
|
return
|
|
|
|
rev_num = selected["revision_number"]
|
|
|
|
# Determine destination path
|
|
try:
|
|
item = _client.get_item(part_number)
|
|
except Exception:
|
|
item = {}
|
|
dest_path = get_cad_file_path(part_number, item.get("description", ""))
|
|
dest_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Download with progress
|
|
progress = QtGui.QProgressDialog(
|
|
f"Downloading {part_number} rev {rev_num}...", "Cancel", 0, 100
|
|
)
|
|
progress.setWindowModality(QtCore.Qt.WindowModal)
|
|
progress.setMinimumDuration(0)
|
|
progress.setValue(0)
|
|
|
|
def on_progress(downloaded, total):
|
|
if progress.wasCanceled():
|
|
return
|
|
if total > 0:
|
|
pct = int(downloaded * 100 / total)
|
|
progress.setValue(min(pct, 99))
|
|
else:
|
|
# Indeterminate - pulse between 0-90
|
|
progress.setValue(min(int(downloaded / 1024) % 90, 89))
|
|
QtGui.QApplication.processEvents()
|
|
|
|
try:
|
|
ok = _client._download_file(
|
|
part_number, rev_num, str(dest_path), progress_callback=on_progress
|
|
)
|
|
except Exception as e:
|
|
progress.close()
|
|
QtGui.QMessageBox.warning(None, "Pull", f"Download failed: {e}")
|
|
return
|
|
|
|
progress.setValue(100)
|
|
progress.close()
|
|
|
|
if not ok:
|
|
QtGui.QMessageBox.warning(None, "Pull", "Download failed.")
|
|
return
|
|
|
|
FreeCAD.Console.PrintMessage(f"Pulled revision {rev_num} of {part_number}\n")
|
|
|
|
# Close existing document if open, then reopen
|
|
if doc and doc.FileName == str(dest_path):
|
|
FreeCAD.closeDocument(doc.Name)
|
|
FreeCAD.openDocument(str(dest_path))
|
|
|
|
# Update SiloRevision property on the tracked object
|
|
new_doc = FreeCAD.ActiveDocument
|
|
if new_doc:
|
|
new_obj = get_tracked_object(new_doc)
|
|
if new_obj and hasattr(new_obj, "SiloRevision"):
|
|
new_obj.SiloRevision = rev_num
|
|
new_doc.save()
|
|
|
|
def IsActive(self):
|
|
return True
|
|
|
|
|
|
class Silo_Push:
|
|
"""Upload local files to MinIO."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "Push",
|
|
"ToolTip": "Upload local files that aren't in MinIO",
|
|
"Pixmap": _icon("push"),
|
|
}
|
|
|
|
def Activated(self):
|
|
from datetime import datetime, timezone
|
|
|
|
from PySide import QtGui
|
|
|
|
# Find files that need uploading (no server file, or local is newer)
|
|
local_files = search_local_files()
|
|
unuploaded = []
|
|
|
|
for lf in local_files:
|
|
pn = lf["part_number"]
|
|
try:
|
|
_client.get_item(pn) # Check if in DB
|
|
server_rev = _client.latest_file_revision(pn)
|
|
if not server_rev:
|
|
# No file on server at all
|
|
unuploaded.append(lf)
|
|
else:
|
|
# Compare local mtime against server revision timestamp
|
|
try:
|
|
local_mtime = os.path.getmtime(lf["path"])
|
|
server_time_str = server_rev.get("created_at", "")
|
|
if server_time_str:
|
|
server_dt = datetime.fromisoformat(
|
|
server_time_str.replace("Z", "+00:00")
|
|
)
|
|
local_dt = datetime.fromtimestamp(local_mtime, tz=timezone.utc)
|
|
if local_dt > server_dt:
|
|
unuploaded.append(lf)
|
|
else:
|
|
# Can't parse server time, assume needs upload
|
|
unuploaded.append(lf)
|
|
except Exception:
|
|
# On any comparison error, include it
|
|
unuploaded.append(lf)
|
|
except Exception:
|
|
pass # Not in DB, skip
|
|
|
|
if not unuploaded:
|
|
QtGui.QMessageBox.information(None, "Push", "All local files are already uploaded.")
|
|
return
|
|
|
|
msg = f"Found {len(unuploaded)} files to upload:\n\n"
|
|
for item in unuploaded[:10]:
|
|
msg += f" {item['part_number']}\n"
|
|
if len(unuploaded) > 10:
|
|
msg += f" ... and {len(unuploaded) - 10} more\n"
|
|
msg += "\nUpload?"
|
|
|
|
reply = QtGui.QMessageBox.question(
|
|
None, "Push", msg, QtGui.QMessageBox.Yes | QtGui.QMessageBox.No
|
|
)
|
|
if reply != QtGui.QMessageBox.Yes:
|
|
return
|
|
|
|
uploaded = 0
|
|
for item in unuploaded:
|
|
result = _sync.upload_file(item["part_number"], item["path"], "Synced from local")
|
|
if result:
|
|
uploaded += 1
|
|
|
|
QtGui.QMessageBox.information(None, "Push", f"Uploaded {uploaded} files.")
|
|
|
|
def IsActive(self):
|
|
return _server_mode == "normal"
|
|
|
|
|
|
class Silo_Info:
|
|
"""Show item status and revision history."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "Info",
|
|
"ToolTip": "Show item status and revision history",
|
|
"Pixmap": _icon("info"),
|
|
}
|
|
|
|
def Activated(self):
|
|
from PySide import QtGui
|
|
|
|
doc = FreeCAD.ActiveDocument
|
|
if not doc:
|
|
FreeCAD.Console.PrintError("No active document\n")
|
|
return
|
|
|
|
obj = get_tracked_object(doc)
|
|
if not obj:
|
|
FreeCAD.Console.PrintError("No tracked object\n")
|
|
return
|
|
|
|
part_number = obj.SiloPartNumber
|
|
|
|
try:
|
|
item = _client.get_item(part_number)
|
|
revisions = _client.get_revisions(part_number)
|
|
|
|
# Get projects for item
|
|
try:
|
|
projects = _client.get_item_projects(part_number)
|
|
project_codes = [p.get("code", "") for p in projects if p.get("code")]
|
|
except Exception:
|
|
project_codes = []
|
|
|
|
msg = f"<h3>{part_number}</h3>"
|
|
msg += f"<p><b>Type:</b> {item.get('item_type', '-')}</p>"
|
|
msg += f"<p><b>Description:</b> {item.get('description', '-')}</p>"
|
|
msg += (
|
|
f"<p><b>Projects:</b> {', '.join(project_codes) if project_codes else 'None'}</p>"
|
|
)
|
|
msg += f"<p><b>Current Revision:</b> {item.get('current_revision', 1)}</p>"
|
|
msg += f"<p><b>Local Revision:</b> {getattr(obj, 'SiloRevision', '-')}</p>"
|
|
|
|
has_file, _ = _client.has_file(part_number)
|
|
msg += f"<p><b>File in MinIO:</b> {'Yes' if has_file else 'No'}</p>"
|
|
|
|
# Show current revision status
|
|
if revisions:
|
|
current_status = revisions[0].get("status", "draft")
|
|
current_labels = revisions[0].get("labels", [])
|
|
msg += f"<p><b>Current Status:</b> {current_status}</p>"
|
|
if current_labels:
|
|
msg += f"<p><b>Labels:</b> {', '.join(current_labels)}</p>"
|
|
|
|
msg += "<h4>Revision History</h4><table border='1' cellpadding='4'>"
|
|
msg += "<tr><th>Rev</th><th>Status</th><th>Date</th><th>File</th><th>Comment</th></tr>"
|
|
for rev in revisions:
|
|
file_icon = "✓" if rev.get("file_key") else "-"
|
|
comment = rev.get("comment", "") or "-"
|
|
date = rev.get("created_at", "")[:10]
|
|
status = rev.get("status", "draft")
|
|
msg += f"<tr><td>{rev['revision_number']}</td><td>{status}</td><td>{date}</td><td>{file_icon}</td><td>{comment}</td></tr>"
|
|
msg += "</table>"
|
|
|
|
dialog = QtGui.QMessageBox()
|
|
dialog.setWindowTitle("Item Info")
|
|
dialog.setTextFormat(QtGui.Qt.RichText)
|
|
dialog.setText(msg)
|
|
dialog.exec_()
|
|
|
|
except Exception as e:
|
|
QtGui.QMessageBox.warning(None, "Info", f"Failed to get info: {e}")
|
|
|
|
def IsActive(self):
|
|
return FreeCAD.ActiveDocument is not None
|
|
|
|
|
|
class Silo_TagProjects:
|
|
"""Manage project tags for an item."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "Tag Projects",
|
|
"ToolTip": "Add or remove project tags for an item",
|
|
"Pixmap": _icon("tag"),
|
|
}
|
|
|
|
def Activated(self):
|
|
from PySide import QtGui
|
|
|
|
doc = FreeCAD.ActiveDocument
|
|
if not doc:
|
|
FreeCAD.Console.PrintError("No active document\n")
|
|
return
|
|
|
|
obj = get_tracked_object(doc)
|
|
if not obj:
|
|
FreeCAD.Console.PrintError("No tracked object\n")
|
|
return
|
|
|
|
part_number = obj.SiloPartNumber
|
|
|
|
try:
|
|
# Get current projects for item
|
|
current_projects = _client.get_item_projects(part_number)
|
|
current_codes = {p.get("code", "") for p in current_projects if p.get("code")}
|
|
|
|
# Get all available projects
|
|
all_projects = _client.get_projects()
|
|
all_codes = [p.get("code", "") for p in all_projects if p.get("code")]
|
|
|
|
if not all_codes:
|
|
QtGui.QMessageBox.information(
|
|
None,
|
|
"Tag Projects",
|
|
"No projects available. Create projects first.",
|
|
)
|
|
return
|
|
|
|
# Multi-select dialog
|
|
dialog = QtGui.QDialog()
|
|
dialog.setWindowTitle(f"Tag Projects for {part_number}")
|
|
dialog.setMinimumWidth(350)
|
|
layout = QtGui.QVBoxLayout(dialog)
|
|
|
|
label = QtGui.QLabel("Select projects to associate with this item:")
|
|
layout.addWidget(label)
|
|
|
|
list_widget = QtGui.QListWidget()
|
|
list_widget.setSelectionMode(QtGui.QAbstractItemView.MultiSelection)
|
|
for code in all_codes:
|
|
item = QtGui.QListWidgetItem(code)
|
|
list_widget.addItem(item)
|
|
if code in current_codes:
|
|
item.setSelected(True)
|
|
layout.addWidget(list_widget)
|
|
|
|
btn_layout = QtGui.QHBoxLayout()
|
|
cancel_btn = QtGui.QPushButton("Cancel")
|
|
save_btn = QtGui.QPushButton("Save")
|
|
btn_layout.addWidget(cancel_btn)
|
|
btn_layout.addWidget(save_btn)
|
|
layout.addLayout(btn_layout)
|
|
|
|
cancel_btn.clicked.connect(dialog.reject)
|
|
save_btn.clicked.connect(dialog.accept)
|
|
|
|
if dialog.exec_() == QtGui.QDialog.Accepted:
|
|
selected = [item.text() for item in list_widget.selectedItems()]
|
|
|
|
# Add new tags
|
|
to_add = [c for c in selected if c not in current_codes]
|
|
if to_add:
|
|
_client.add_item_projects(part_number, to_add)
|
|
|
|
# Note: removing tags would require a separate API call per project
|
|
# For simplicity, we just add new ones here
|
|
|
|
msg = f"Updated project tags for {part_number}"
|
|
if to_add:
|
|
msg += f"\nAdded: {', '.join(to_add)}"
|
|
|
|
QtGui.QMessageBox.information(None, "Tag Projects", msg)
|
|
FreeCAD.Console.PrintMessage(f"{msg}\n")
|
|
|
|
except Exception as e:
|
|
QtGui.QMessageBox.warning(None, "Tag Projects", f"Failed: {e}")
|
|
|
|
def IsActive(self):
|
|
return FreeCAD.ActiveDocument is not None
|
|
|
|
|
|
class Silo_Rollback:
|
|
"""Rollback to a previous revision."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "Rollback",
|
|
"ToolTip": "Rollback to a previous revision",
|
|
"Pixmap": _icon("rollback"),
|
|
}
|
|
|
|
def Activated(self):
|
|
from PySide import QtCore, QtGui
|
|
|
|
doc = FreeCAD.ActiveDocument
|
|
if not doc:
|
|
FreeCAD.Console.PrintError("No active document\n")
|
|
return
|
|
|
|
obj = get_tracked_object(doc)
|
|
if not obj:
|
|
FreeCAD.Console.PrintError("No tracked object\n")
|
|
return
|
|
|
|
part_number = obj.SiloPartNumber
|
|
|
|
try:
|
|
revisions = _client.get_revisions(part_number)
|
|
if len(revisions) < 2:
|
|
QtGui.QMessageBox.information(
|
|
None, "Rollback", "No previous revisions to rollback to."
|
|
)
|
|
return
|
|
|
|
# Build revision list for selection (exclude current/latest)
|
|
current_rev = revisions[0]["revision_number"]
|
|
prev_revisions = revisions[1:] # All except latest
|
|
|
|
# Create selection dialog
|
|
dialog = QtGui.QDialog()
|
|
dialog.setWindowTitle(f"Rollback {part_number}")
|
|
dialog.setMinimumWidth(500)
|
|
dialog.setMinimumHeight(300)
|
|
layout = QtGui.QVBoxLayout(dialog)
|
|
|
|
label = QtGui.QLabel(f"Select a revision to rollback to (current: Rev {current_rev}):")
|
|
layout.addWidget(label)
|
|
|
|
# Revision table
|
|
table = QtGui.QTableWidget()
|
|
table.setColumnCount(4)
|
|
table.setHorizontalHeaderLabels(["Rev", "Status", "Date", "Comment"])
|
|
table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows)
|
|
table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection)
|
|
table.setRowCount(len(prev_revisions))
|
|
table.horizontalHeader().setStretchLastSection(True)
|
|
|
|
for i, rev in enumerate(prev_revisions):
|
|
table.setItem(i, 0, QtGui.QTableWidgetItem(str(rev["revision_number"])))
|
|
table.setItem(i, 1, QtGui.QTableWidgetItem(rev.get("status", "draft")))
|
|
table.setItem(i, 2, QtGui.QTableWidgetItem(rev.get("created_at", "")[:10]))
|
|
table.setItem(i, 3, QtGui.QTableWidgetItem(rev.get("comment", "") or ""))
|
|
|
|
table.resizeColumnsToContents()
|
|
layout.addWidget(table)
|
|
|
|
# Comment field
|
|
comment_label = QtGui.QLabel("Rollback comment (optional):")
|
|
layout.addWidget(comment_label)
|
|
comment_input = QtGui.QLineEdit()
|
|
comment_input.setPlaceholderText("Reason for rollback...")
|
|
layout.addWidget(comment_input)
|
|
|
|
# Buttons
|
|
btn_layout = QtGui.QHBoxLayout()
|
|
cancel_btn = QtGui.QPushButton("Cancel")
|
|
rollback_btn = QtGui.QPushButton("Rollback")
|
|
btn_layout.addStretch()
|
|
btn_layout.addWidget(cancel_btn)
|
|
btn_layout.addWidget(rollback_btn)
|
|
layout.addLayout(btn_layout)
|
|
|
|
selected_rev = [None]
|
|
|
|
def on_rollback():
|
|
selected = table.selectedItems()
|
|
if not selected:
|
|
QtGui.QMessageBox.warning(dialog, "Rollback", "Please select a revision")
|
|
return
|
|
selected_rev[0] = int(table.item(selected[0].row(), 0).text())
|
|
dialog.accept()
|
|
|
|
cancel_btn.clicked.connect(dialog.reject)
|
|
rollback_btn.clicked.connect(on_rollback)
|
|
|
|
if dialog.exec_() == QtGui.QDialog.Accepted and selected_rev[0]:
|
|
target_rev = selected_rev[0]
|
|
comment = comment_input.text().strip()
|
|
|
|
# Confirm
|
|
reply = QtGui.QMessageBox.question(
|
|
None,
|
|
"Confirm Rollback",
|
|
f"Create new revision by rolling back to Rev {target_rev}?\n\n"
|
|
"This will copy properties and file reference from the selected revision.",
|
|
QtGui.QMessageBox.Yes | QtGui.QMessageBox.No,
|
|
)
|
|
if reply != QtGui.QMessageBox.Yes:
|
|
return
|
|
|
|
# Perform rollback
|
|
result = _client.rollback_revision(part_number, target_rev, comment)
|
|
new_rev = result["revision_number"]
|
|
|
|
FreeCAD.Console.PrintMessage(
|
|
f"Created revision {new_rev} (rollback from {target_rev})\n"
|
|
)
|
|
QtGui.QMessageBox.information(
|
|
None,
|
|
"Rollback Complete",
|
|
f"Created revision {new_rev} from rollback to Rev {target_rev}.\n\n"
|
|
"Use 'Pull' to download the rolled-back file.",
|
|
)
|
|
|
|
except Exception as e:
|
|
QtGui.QMessageBox.warning(None, "Rollback", f"Failed: {e}")
|
|
|
|
def IsActive(self):
|
|
return FreeCAD.ActiveDocument is not None
|
|
|
|
|
|
class Silo_SetStatus:
|
|
"""Set revision status (draft, review, released, obsolete)."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "Set Status",
|
|
"ToolTip": "Set the status of the current revision",
|
|
"Pixmap": _icon("status"),
|
|
}
|
|
|
|
def Activated(self):
|
|
from PySide import QtGui
|
|
|
|
doc = FreeCAD.ActiveDocument
|
|
if not doc:
|
|
FreeCAD.Console.PrintError("No active document\n")
|
|
return
|
|
|
|
obj = get_tracked_object(doc)
|
|
if not obj:
|
|
FreeCAD.Console.PrintError("No tracked object\n")
|
|
return
|
|
|
|
part_number = obj.SiloPartNumber
|
|
local_rev = getattr(obj, "SiloRevision", 1)
|
|
|
|
try:
|
|
# Get current revision info
|
|
revisions = _client.get_revisions(part_number)
|
|
current_rev = revisions[0] if revisions else None
|
|
if not current_rev:
|
|
QtGui.QMessageBox.warning(None, "Set Status", "No revisions found")
|
|
return
|
|
|
|
current_status = current_rev.get("status", "draft")
|
|
rev_num = current_rev["revision_number"]
|
|
|
|
# Status selection
|
|
statuses = ["draft", "review", "released", "obsolete"]
|
|
status, ok = QtGui.QInputDialog.getItem(
|
|
None,
|
|
"Set Revision Status",
|
|
f"Set status for Rev {rev_num} (current: {current_status}):",
|
|
statuses,
|
|
statuses.index(current_status),
|
|
False,
|
|
)
|
|
|
|
if not ok or status == current_status:
|
|
return
|
|
|
|
# Update status
|
|
_client.update_revision(part_number, rev_num, status=status)
|
|
|
|
FreeCAD.Console.PrintMessage(f"Updated Rev {rev_num} status to '{status}'\n")
|
|
QtGui.QMessageBox.information(
|
|
None, "Status Updated", f"Revision {rev_num} status set to '{status}'"
|
|
)
|
|
|
|
except Exception as e:
|
|
QtGui.QMessageBox.warning(None, "Set Status", f"Failed: {e}")
|
|
|
|
def IsActive(self):
|
|
return FreeCAD.ActiveDocument is not None
|
|
|
|
|
|
class Silo_Settings:
|
|
"""Configure Silo connection settings."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "Settings",
|
|
"ToolTip": "Configure Silo API URL and SSL settings",
|
|
"Pixmap": _icon("info"),
|
|
}
|
|
|
|
def Activated(self):
|
|
from PySide import QtCore, QtGui
|
|
|
|
param = FreeCAD.ParamGet(_PREF_GROUP)
|
|
|
|
dialog = QtGui.QDialog()
|
|
dialog.setWindowTitle("Silo Settings")
|
|
dialog.setMinimumWidth(450)
|
|
|
|
layout = QtGui.QVBoxLayout(dialog)
|
|
|
|
# URL
|
|
url_label = QtGui.QLabel("Silo API URL:")
|
|
layout.addWidget(url_label)
|
|
|
|
url_input = QtGui.QLineEdit()
|
|
url_input.setPlaceholderText("http://localhost:8080/api")
|
|
current_url = param.GetString("ApiUrl", "")
|
|
if current_url:
|
|
url_input.setText(current_url)
|
|
else:
|
|
env_url = os.environ.get("SILO_API_URL", "")
|
|
if env_url:
|
|
url_input.setText(env_url)
|
|
layout.addWidget(url_input)
|
|
|
|
url_hint = QtGui.QLabel(
|
|
"Full URL with path (e.g. http://localhost:8080/api) or just the "
|
|
"hostname (e.g. https://silo.kindred.internal) and /api is "
|
|
"appended automatically. Leave empty for SILO_API_URL env var."
|
|
)
|
|
url_hint.setWordWrap(True)
|
|
url_hint.setStyleSheet("color: #888; font-size: 11px;")
|
|
layout.addWidget(url_hint)
|
|
|
|
layout.addSpacing(10)
|
|
|
|
# SSL
|
|
ssl_checkbox = QtGui.QCheckBox("Verify SSL certificates")
|
|
ssl_checkbox.setChecked(param.GetBool("SslVerify", True))
|
|
layout.addWidget(ssl_checkbox)
|
|
|
|
ssl_hint = QtGui.QLabel("Disable only for internal servers with self-signed certificates.")
|
|
ssl_hint.setWordWrap(True)
|
|
ssl_hint.setStyleSheet("color: #888; font-size: 11px;")
|
|
layout.addWidget(ssl_hint)
|
|
|
|
layout.addSpacing(5)
|
|
|
|
# Custom CA certificate
|
|
cert_label = QtGui.QLabel("Custom CA certificate file:")
|
|
layout.addWidget(cert_label)
|
|
|
|
cert_row = QtGui.QHBoxLayout()
|
|
cert_input = QtGui.QLineEdit()
|
|
cert_input.setPlaceholderText("(Use system CA certificates)")
|
|
current_cert = param.GetString("SslCertPath", "")
|
|
if current_cert:
|
|
cert_input.setText(current_cert)
|
|
cert_browse = QtGui.QPushButton("Browse...")
|
|
cert_row.addWidget(cert_input)
|
|
cert_row.addWidget(cert_browse)
|
|
layout.addLayout(cert_row)
|
|
|
|
cert_hint = QtGui.QLabel(
|
|
"Path to a PEM/CRT file for internal CAs. Leave empty for system certificates only."
|
|
)
|
|
cert_hint.setWordWrap(True)
|
|
cert_hint.setStyleSheet("color: #888; font-size: 11px;")
|
|
layout.addWidget(cert_hint)
|
|
|
|
def on_browse_cert():
|
|
path, _ = QtGui.QFileDialog.getOpenFileName(
|
|
dialog,
|
|
"Select CA Certificate",
|
|
os.path.dirname(cert_input.text()) or "/etc/ssl/certs",
|
|
"Certificates (*.pem *.crt *.cer);;All Files (*)",
|
|
)
|
|
if path:
|
|
cert_input.setText(path)
|
|
|
|
cert_browse.clicked.connect(on_browse_cert)
|
|
|
|
layout.addSpacing(10)
|
|
|
|
# Authentication section
|
|
auth_heading = QtGui.QLabel("<b>Authentication</b>")
|
|
auth_heading.setTextFormat(QtCore.Qt.RichText)
|
|
layout.addWidget(auth_heading)
|
|
|
|
auth_user = _get_auth_username()
|
|
auth_role = _get_auth_role()
|
|
auth_source = _get_auth_source()
|
|
has_token = bool(_get_auth_token())
|
|
|
|
if has_token and auth_user:
|
|
auth_parts = [f"Logged in as <b>{auth_user}</b>"]
|
|
if auth_role:
|
|
auth_parts.append(f"(role: {auth_role})")
|
|
if auth_source:
|
|
auth_parts.append(f"via {auth_source}")
|
|
auth_status_text = " ".join(auth_parts)
|
|
else:
|
|
auth_status_text = "Not logged in"
|
|
|
|
auth_status_lbl = QtGui.QLabel(auth_status_text)
|
|
auth_status_lbl.setTextFormat(QtCore.Qt.RichText)
|
|
layout.addWidget(auth_status_lbl)
|
|
|
|
# API token input
|
|
token_label = QtGui.QLabel("API Token:")
|
|
layout.addWidget(token_label)
|
|
|
|
token_row = QtGui.QHBoxLayout()
|
|
token_input = QtGui.QLineEdit()
|
|
token_input.setEchoMode(QtGui.QLineEdit.Password)
|
|
token_input.setPlaceholderText("silo_... (paste token or use Login)")
|
|
current_token = param.GetString("ApiToken", "")
|
|
if current_token:
|
|
token_input.setText(current_token)
|
|
token_row.addWidget(token_input)
|
|
|
|
token_show_btn = QtGui.QToolButton()
|
|
token_show_btn.setText("\U0001f441")
|
|
token_show_btn.setCheckable(True)
|
|
token_show_btn.setFixedSize(28, 28)
|
|
token_show_btn.setToolTip("Show/hide token")
|
|
|
|
def on_toggle_show(checked):
|
|
if checked:
|
|
token_input.setEchoMode(QtGui.QLineEdit.Normal)
|
|
else:
|
|
token_input.setEchoMode(QtGui.QLineEdit.Password)
|
|
|
|
token_show_btn.toggled.connect(on_toggle_show)
|
|
token_row.addWidget(token_show_btn)
|
|
layout.addLayout(token_row)
|
|
|
|
token_hint = QtGui.QLabel(
|
|
"Paste an API token generated from the Silo web UI, "
|
|
"or use Login in the Database Auth panel to create one "
|
|
"automatically. Tokens can also be set via the "
|
|
"SILO_API_TOKEN environment variable."
|
|
)
|
|
token_hint.setWordWrap(True)
|
|
token_hint.setStyleSheet("color: #888; font-size: 11px;")
|
|
layout.addWidget(token_hint)
|
|
|
|
layout.addSpacing(4)
|
|
|
|
clear_auth_btn = QtGui.QPushButton("Clear Token and Logout")
|
|
clear_auth_btn.setEnabled(has_token)
|
|
|
|
def on_clear_auth():
|
|
_clear_auth()
|
|
token_input.setText("")
|
|
auth_status_lbl.setText("Not logged in")
|
|
clear_auth_btn.setEnabled(False)
|
|
FreeCAD.Console.PrintMessage("Silo: API token and credentials cleared\n")
|
|
|
|
clear_auth_btn.clicked.connect(on_clear_auth)
|
|
layout.addWidget(clear_auth_btn)
|
|
|
|
layout.addSpacing(10)
|
|
|
|
# Current effective values (read-only)
|
|
cert_display = param.GetString("SslCertPath", "") or "(system defaults)"
|
|
if has_token and auth_user:
|
|
auth_display = f"{auth_user} ({auth_role or 'unknown role'})"
|
|
if auth_source:
|
|
auth_display += f" via {auth_source}"
|
|
elif has_token:
|
|
auth_display = "token configured (user unknown)"
|
|
else:
|
|
auth_display = "not configured"
|
|
status_label = QtGui.QLabel(
|
|
f"<b>Active URL:</b> {_get_api_url()}<br>"
|
|
f"<b>SSL verification:</b> {'enabled' if _get_ssl_verify() else 'disabled'}<br>"
|
|
f"<b>CA certificate:</b> {cert_display}<br>"
|
|
f"<b>Authentication:</b> {auth_display}"
|
|
)
|
|
status_label.setTextFormat(QtCore.Qt.RichText)
|
|
layout.addWidget(status_label)
|
|
|
|
layout.addStretch()
|
|
|
|
# Buttons
|
|
btn_layout = QtGui.QHBoxLayout()
|
|
save_btn = QtGui.QPushButton("Save")
|
|
cancel_btn = QtGui.QPushButton("Cancel")
|
|
btn_layout.addStretch()
|
|
btn_layout.addWidget(save_btn)
|
|
btn_layout.addWidget(cancel_btn)
|
|
layout.addLayout(btn_layout)
|
|
|
|
def on_save():
|
|
url = url_input.text().strip()
|
|
param.SetString("ApiUrl", url)
|
|
param.SetBool("SslVerify", ssl_checkbox.isChecked())
|
|
cert_path = cert_input.text().strip()
|
|
param.SetString("SslCertPath", cert_path)
|
|
# Save API token if changed
|
|
new_token = token_input.text().strip()
|
|
old_token = param.GetString("ApiToken", "")
|
|
if new_token != old_token:
|
|
param.SetString("ApiToken", new_token)
|
|
if new_token and not old_token:
|
|
FreeCAD.Console.PrintMessage("Silo: API token configured\n")
|
|
elif not new_token and old_token:
|
|
_clear_auth()
|
|
FreeCAD.Console.PrintMessage("Silo: API token removed\n")
|
|
else:
|
|
FreeCAD.Console.PrintMessage("Silo: API token updated\n")
|
|
FreeCAD.Console.PrintMessage(
|
|
f"Silo settings saved. URL: {_get_api_url()}, "
|
|
f"SSL verify: {_get_ssl_verify()}, "
|
|
f"Cert: {cert_path or '(system)'}\n"
|
|
)
|
|
dialog.accept()
|
|
|
|
save_btn.clicked.connect(on_save)
|
|
cancel_btn.clicked.connect(dialog.reject)
|
|
|
|
dialog.exec_()
|
|
|
|
def IsActive(self):
|
|
return True
|
|
|
|
|
|
class Silo_BOM:
|
|
"""View and manage Bill of Materials for the current item."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "BOM",
|
|
"ToolTip": "View and manage Bill of Materials",
|
|
"Pixmap": _icon("bom"),
|
|
}
|
|
|
|
def Activated(self):
|
|
from PySide import QtCore, QtGui
|
|
|
|
doc = FreeCAD.ActiveDocument
|
|
if not doc:
|
|
FreeCAD.Console.PrintError("No active document\n")
|
|
return
|
|
|
|
obj = get_tracked_object(doc)
|
|
if not obj:
|
|
reply = QtGui.QMessageBox.question(
|
|
None,
|
|
"BOM",
|
|
"This document is not registered with Silo.\n\nRegister it now?",
|
|
QtGui.QMessageBox.Yes | QtGui.QMessageBox.No,
|
|
)
|
|
if reply != QtGui.QMessageBox.Yes:
|
|
return
|
|
FreeCADGui.runCommand("Silo_New")
|
|
obj = get_tracked_object(doc)
|
|
if not obj:
|
|
return
|
|
|
|
part_number = obj.SiloPartNumber
|
|
|
|
try:
|
|
item = _client.get_item(part_number)
|
|
except Exception as e:
|
|
QtGui.QMessageBox.warning(None, "BOM", f"Failed to get item info:\n{e}")
|
|
return
|
|
|
|
# Build the dialog
|
|
dialog = QtGui.QDialog()
|
|
dialog.setWindowTitle(f"BOM - {part_number}")
|
|
dialog.setMinimumWidth(750)
|
|
dialog.setMinimumHeight(450)
|
|
layout = QtGui.QVBoxLayout(dialog)
|
|
|
|
# Item header
|
|
header = QtGui.QLabel(f"<b>{part_number}</b> - {item.get('description', '')}")
|
|
layout.addWidget(header)
|
|
|
|
# Tab widget
|
|
tabs = QtGui.QTabWidget()
|
|
layout.addWidget(tabs)
|
|
|
|
# ── Tab 1: BOM (children of this item) ──
|
|
bom_widget = QtGui.QWidget()
|
|
bom_layout = QtGui.QVBoxLayout(bom_widget)
|
|
|
|
bom_table = QtGui.QTableWidget()
|
|
bom_table.setColumnCount(7)
|
|
bom_table.setHorizontalHeaderLabels(
|
|
["Part Number", "Description", "Type", "Qty", "Unit", "Ref Des", "Rev"]
|
|
)
|
|
bom_table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows)
|
|
bom_table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection)
|
|
bom_table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers)
|
|
bom_table.horizontalHeader().setStretchLastSection(True)
|
|
bom_layout.addWidget(bom_table)
|
|
|
|
# BOM button bar
|
|
bom_btn_layout = QtGui.QHBoxLayout()
|
|
add_btn = QtGui.QPushButton("Add")
|
|
edit_btn = QtGui.QPushButton("Edit")
|
|
remove_btn = QtGui.QPushButton("Remove")
|
|
bom_btn_layout.addWidget(add_btn)
|
|
bom_btn_layout.addWidget(edit_btn)
|
|
bom_btn_layout.addWidget(remove_btn)
|
|
bom_btn_layout.addStretch()
|
|
bom_layout.addLayout(bom_btn_layout)
|
|
|
|
tabs.addTab(bom_widget, "BOM")
|
|
|
|
# ── Tab 2: Where Used (parents of this item) ──
|
|
wu_widget = QtGui.QWidget()
|
|
wu_layout = QtGui.QVBoxLayout(wu_widget)
|
|
|
|
wu_table = QtGui.QTableWidget()
|
|
wu_table.setColumnCount(5)
|
|
wu_table.setHorizontalHeaderLabels(["Parent Part Number", "Type", "Qty", "Unit", "Ref Des"])
|
|
wu_table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows)
|
|
wu_table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection)
|
|
wu_table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers)
|
|
wu_table.horizontalHeader().setStretchLastSection(True)
|
|
wu_layout.addWidget(wu_table)
|
|
|
|
tabs.addTab(wu_widget, "Where Used")
|
|
|
|
# ── Data loading ──
|
|
|
|
bom_data = []
|
|
|
|
def load_bom():
|
|
nonlocal bom_data
|
|
try:
|
|
bom_data = _client.get_bom(part_number)
|
|
except Exception as exc:
|
|
FreeCAD.Console.PrintWarning(f"BOM load error: {exc}\n")
|
|
bom_data = []
|
|
|
|
bom_table.setRowCount(len(bom_data))
|
|
for row, entry in enumerate(bom_data):
|
|
bom_table.setItem(
|
|
row, 0, QtGui.QTableWidgetItem(entry.get("child_part_number", ""))
|
|
)
|
|
bom_table.setItem(
|
|
row, 1, QtGui.QTableWidgetItem(entry.get("child_description", ""))
|
|
)
|
|
bom_table.setItem(row, 2, QtGui.QTableWidgetItem(entry.get("rel_type", "")))
|
|
qty = entry.get("quantity")
|
|
bom_table.setItem(
|
|
row, 3, QtGui.QTableWidgetItem(str(qty) if qty is not None else "")
|
|
)
|
|
bom_table.setItem(row, 4, QtGui.QTableWidgetItem(entry.get("unit") or ""))
|
|
ref_des = entry.get("reference_designators") or []
|
|
bom_table.setItem(row, 5, QtGui.QTableWidgetItem(", ".join(ref_des)))
|
|
bom_table.setItem(
|
|
row,
|
|
6,
|
|
QtGui.QTableWidgetItem(str(entry.get("effective_revision", ""))),
|
|
)
|
|
bom_table.resizeColumnsToContents()
|
|
|
|
def load_where_used():
|
|
try:
|
|
wu_data = _client.get_bom_where_used(part_number)
|
|
except Exception as exc:
|
|
FreeCAD.Console.PrintWarning(f"Where-used load error: {exc}\n")
|
|
wu_data = []
|
|
|
|
wu_table.setRowCount(len(wu_data))
|
|
for row, entry in enumerate(wu_data):
|
|
wu_table.setItem(
|
|
row, 0, QtGui.QTableWidgetItem(entry.get("parent_part_number", ""))
|
|
)
|
|
wu_table.setItem(row, 1, QtGui.QTableWidgetItem(entry.get("rel_type", "")))
|
|
qty = entry.get("quantity")
|
|
wu_table.setItem(
|
|
row, 2, QtGui.QTableWidgetItem(str(qty) if qty is not None else "")
|
|
)
|
|
wu_table.setItem(row, 3, QtGui.QTableWidgetItem(entry.get("unit") or ""))
|
|
ref_des = entry.get("reference_designators") or []
|
|
wu_table.setItem(row, 4, QtGui.QTableWidgetItem(", ".join(ref_des)))
|
|
wu_table.resizeColumnsToContents()
|
|
|
|
# ── Button handlers ──
|
|
|
|
def on_add():
|
|
add_dlg = QtGui.QDialog(dialog)
|
|
add_dlg.setWindowTitle("Add BOM Entry")
|
|
add_dlg.setMinimumWidth(400)
|
|
al = QtGui.QFormLayout(add_dlg)
|
|
|
|
child_input = QtGui.QLineEdit()
|
|
child_input.setPlaceholderText("e.g. F01-0001")
|
|
al.addRow("Child Part Number:", child_input)
|
|
|
|
type_combo = QtGui.QComboBox()
|
|
type_combo.addItems(["component", "alternate", "reference"])
|
|
al.addRow("Relationship Type:", type_combo)
|
|
|
|
qty_input = QtGui.QLineEdit()
|
|
qty_input.setPlaceholderText("e.g. 4")
|
|
al.addRow("Quantity:", qty_input)
|
|
|
|
unit_input = QtGui.QLineEdit()
|
|
unit_input.setPlaceholderText("e.g. ea, m, kg")
|
|
al.addRow("Unit:", unit_input)
|
|
|
|
refdes_input = QtGui.QLineEdit()
|
|
refdes_input.setPlaceholderText("e.g. R1, R2, R3")
|
|
al.addRow("Ref Designators:", refdes_input)
|
|
|
|
btn_box = QtGui.QDialogButtonBox(
|
|
QtGui.QDialogButtonBox.Ok | QtGui.QDialogButtonBox.Cancel
|
|
)
|
|
btn_box.accepted.connect(add_dlg.accept)
|
|
btn_box.rejected.connect(add_dlg.reject)
|
|
al.addRow(btn_box)
|
|
|
|
if add_dlg.exec_() != QtGui.QDialog.Accepted:
|
|
return
|
|
|
|
child_pn = child_input.text().strip()
|
|
if not child_pn:
|
|
return
|
|
|
|
qty = None
|
|
qty_text = qty_input.text().strip()
|
|
if qty_text:
|
|
try:
|
|
qty = float(qty_text)
|
|
except ValueError:
|
|
QtGui.QMessageBox.warning(dialog, "BOM", "Quantity must be a number.")
|
|
return
|
|
|
|
unit = unit_input.text().strip() or None
|
|
rel_type = type_combo.currentText()
|
|
|
|
ref_des = None
|
|
refdes_text = refdes_input.text().strip()
|
|
if refdes_text:
|
|
ref_des = [r.strip() for r in refdes_text.split(",") if r.strip()]
|
|
|
|
try:
|
|
_client.add_bom_entry(
|
|
part_number,
|
|
child_pn,
|
|
quantity=qty,
|
|
unit=unit,
|
|
rel_type=rel_type,
|
|
ref_des=ref_des,
|
|
)
|
|
load_bom()
|
|
except Exception as exc:
|
|
QtGui.QMessageBox.warning(dialog, "BOM", f"Failed to add entry:\n{exc}")
|
|
|
|
def on_edit():
|
|
selected = bom_table.selectedItems()
|
|
if not selected:
|
|
return
|
|
row = selected[0].row()
|
|
if row < 0 or row >= len(bom_data):
|
|
return
|
|
|
|
entry = bom_data[row]
|
|
child_pn = entry.get("child_part_number", "")
|
|
|
|
edit_dlg = QtGui.QDialog(dialog)
|
|
edit_dlg.setWindowTitle(f"Edit BOM Entry - {child_pn}")
|
|
edit_dlg.setMinimumWidth(400)
|
|
el = QtGui.QFormLayout(edit_dlg)
|
|
|
|
type_combo = QtGui.QComboBox()
|
|
type_combo.addItems(["component", "alternate", "reference"])
|
|
current_type = entry.get("rel_type", "component")
|
|
idx = type_combo.findText(current_type)
|
|
if idx >= 0:
|
|
type_combo.setCurrentIndex(idx)
|
|
el.addRow("Relationship Type:", type_combo)
|
|
|
|
qty_input = QtGui.QLineEdit()
|
|
qty = entry.get("quantity")
|
|
if qty is not None:
|
|
qty_input.setText(str(qty))
|
|
el.addRow("Quantity:", qty_input)
|
|
|
|
unit_input = QtGui.QLineEdit()
|
|
unit_input.setText(entry.get("unit") or "")
|
|
el.addRow("Unit:", unit_input)
|
|
|
|
refdes_input = QtGui.QLineEdit()
|
|
ref_des = entry.get("reference_designators") or []
|
|
refdes_input.setText(", ".join(ref_des))
|
|
el.addRow("Ref Designators:", refdes_input)
|
|
|
|
btn_box = QtGui.QDialogButtonBox(
|
|
QtGui.QDialogButtonBox.Ok | QtGui.QDialogButtonBox.Cancel
|
|
)
|
|
btn_box.accepted.connect(edit_dlg.accept)
|
|
btn_box.rejected.connect(edit_dlg.reject)
|
|
el.addRow(btn_box)
|
|
|
|
if edit_dlg.exec_() != QtGui.QDialog.Accepted:
|
|
return
|
|
|
|
new_qty = None
|
|
qty_text = qty_input.text().strip()
|
|
if qty_text:
|
|
try:
|
|
new_qty = float(qty_text)
|
|
except ValueError:
|
|
QtGui.QMessageBox.warning(dialog, "BOM", "Quantity must be a number.")
|
|
return
|
|
|
|
new_unit = unit_input.text().strip() or None
|
|
new_type = type_combo.currentText()
|
|
|
|
new_ref_des = None
|
|
refdes_text = refdes_input.text().strip()
|
|
if refdes_text:
|
|
new_ref_des = [r.strip() for r in refdes_text.split(",") if r.strip()]
|
|
else:
|
|
new_ref_des = []
|
|
|
|
try:
|
|
_client.update_bom_entry(
|
|
part_number,
|
|
child_pn,
|
|
quantity=new_qty,
|
|
unit=new_unit,
|
|
rel_type=new_type,
|
|
ref_des=new_ref_des,
|
|
)
|
|
load_bom()
|
|
except Exception as exc:
|
|
QtGui.QMessageBox.warning(dialog, "BOM", f"Failed to update entry:\n{exc}")
|
|
|
|
def on_remove():
|
|
selected = bom_table.selectedItems()
|
|
if not selected:
|
|
return
|
|
row = selected[0].row()
|
|
if row < 0 or row >= len(bom_data):
|
|
return
|
|
|
|
entry = bom_data[row]
|
|
child_pn = entry.get("child_part_number", "")
|
|
|
|
reply = QtGui.QMessageBox.question(
|
|
dialog,
|
|
"Remove BOM Entry",
|
|
f"Remove {child_pn} from BOM?",
|
|
QtGui.QMessageBox.Yes | QtGui.QMessageBox.No,
|
|
)
|
|
if reply != QtGui.QMessageBox.Yes:
|
|
return
|
|
|
|
try:
|
|
_client.delete_bom_entry(part_number, child_pn)
|
|
load_bom()
|
|
except Exception as exc:
|
|
QtGui.QMessageBox.warning(dialog, "BOM", f"Failed to remove entry:\n{exc}")
|
|
|
|
add_btn.clicked.connect(on_add)
|
|
edit_btn.clicked.connect(on_edit)
|
|
remove_btn.clicked.connect(on_remove)
|
|
|
|
# Close button
|
|
close_layout = QtGui.QHBoxLayout()
|
|
close_layout.addStretch()
|
|
close_btn = QtGui.QPushButton("Close")
|
|
close_btn.clicked.connect(dialog.accept)
|
|
close_layout.addWidget(close_btn)
|
|
layout.addLayout(close_layout)
|
|
|
|
# Initial data load
|
|
load_bom()
|
|
load_where_used()
|
|
|
|
dialog.exec_()
|
|
|
|
def IsActive(self):
|
|
return FreeCAD.ActiveDocument is not None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SSE live-update listener
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class SiloEventListener(QtCore.QThread):
|
|
"""Background thread that listens to Server-Sent Events from the Silo API.
|
|
|
|
Emits Qt signals when items are updated or new revisions are created.
|
|
Degrades gracefully if the server does not support the ``/api/events``
|
|
endpoint.
|
|
"""
|
|
|
|
item_updated = QtCore.Signal(str) # part_number
|
|
revision_created = QtCore.Signal(str, int) # part_number, revision
|
|
connection_status = QtCore.Signal(str, int, str) # (status, retry_count, error_message)
|
|
server_mode_changed = QtCore.Signal(str) # "normal" / "read-only" / "degraded"
|
|
|
|
_MAX_RETRIES = 10
|
|
_BASE_DELAY = 1 # seconds, doubles each retry
|
|
_MAX_DELAY = 60 # seconds, backoff cap
|
|
|
|
def __init__(self, parent=None):
|
|
super().__init__(parent)
|
|
self._stop_flag = False
|
|
self._response = None
|
|
|
|
# -- public API ---------------------------------------------------------
|
|
|
|
def stop(self):
|
|
self._stop_flag = True
|
|
# Close the socket so readline() unblocks immediately
|
|
try:
|
|
if self._response is not None:
|
|
self._response.close()
|
|
except Exception:
|
|
pass
|
|
self.wait(5000)
|
|
|
|
# -- thread entry -------------------------------------------------------
|
|
|
|
def run(self):
|
|
import time
|
|
|
|
retries = 0
|
|
last_error = ""
|
|
while not self._stop_flag:
|
|
t0 = time.monotonic()
|
|
try:
|
|
self._listen()
|
|
# _listen returns normally only on clean EOF / stop
|
|
if self._stop_flag:
|
|
return
|
|
last_error = "connection closed"
|
|
except _SSEUnsupported:
|
|
self.connection_status.emit("unsupported", 0, "")
|
|
return
|
|
except Exception as exc:
|
|
last_error = str(exc) or "unknown error"
|
|
|
|
# Reset retries if the connection was up for a while
|
|
elapsed = time.monotonic() - t0
|
|
if elapsed > 30:
|
|
retries = 0
|
|
retries += 1
|
|
|
|
if retries > self._MAX_RETRIES:
|
|
self.connection_status.emit("gave_up", retries - 1, last_error)
|
|
return
|
|
|
|
self.connection_status.emit("disconnected", retries, last_error)
|
|
|
|
delay = min(self._BASE_DELAY * (2 ** (retries - 1)), self._MAX_DELAY)
|
|
|
|
# Interruptible sleep
|
|
for _ in range(int(delay)):
|
|
if self._stop_flag:
|
|
return
|
|
self.msleep(1000)
|
|
|
|
# -- SSE stream reader --------------------------------------------------
|
|
|
|
def _listen(self):
|
|
url = f"{_get_api_url().rstrip('/')}/events"
|
|
headers = {"Accept": "text/event-stream"}
|
|
headers.update(_get_auth_headers())
|
|
req = urllib.request.Request(url, headers=headers, method="GET")
|
|
|
|
try:
|
|
self._response = urllib.request.urlopen(req, context=_get_ssl_context(), timeout=90)
|
|
except urllib.error.HTTPError as e:
|
|
if e.code in (404, 501):
|
|
raise _SSEUnsupported()
|
|
raise
|
|
except urllib.error.URLError:
|
|
raise
|
|
|
|
self.connection_status.emit("connected", 0, "")
|
|
|
|
event_type = ""
|
|
data_buf = ""
|
|
|
|
for raw_line in self._response:
|
|
if self._stop_flag:
|
|
return
|
|
|
|
line = raw_line.decode("utf-8", errors="replace").rstrip("\r\n")
|
|
|
|
if line == "":
|
|
# Blank line = dispatch event
|
|
if data_buf:
|
|
self._dispatch(event_type or "message", data_buf.strip())
|
|
event_type = ""
|
|
data_buf = ""
|
|
elif line.startswith("event:"):
|
|
event_type = line[6:].strip()
|
|
elif line.startswith("data:"):
|
|
data_buf += line[5:].strip() + "\n"
|
|
# Ignore comments (lines starting with ':') and other fields
|
|
|
|
def _dispatch(self, event_type, data):
|
|
try:
|
|
payload = json.loads(data)
|
|
except (json.JSONDecodeError, ValueError):
|
|
return
|
|
|
|
if event_type == "server.state":
|
|
mode = payload.get("mode", "")
|
|
if mode:
|
|
self.server_mode_changed.emit(mode)
|
|
return
|
|
|
|
pn = payload.get("part_number", "")
|
|
if not pn:
|
|
return
|
|
|
|
if event_type in ("item_updated", "message"):
|
|
self.item_updated.emit(pn)
|
|
elif event_type == "revision_created":
|
|
rev = payload.get("revision", 0)
|
|
self.revision_created.emit(pn, int(rev))
|
|
|
|
|
|
class _SSEUnsupported(Exception):
|
|
"""Raised when the server does not support the SSE endpoint."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth dock widget
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class SiloAuthDockWidget:
|
|
"""Content widget for the Silo Database Auth dock panel."""
|
|
|
|
def __init__(self):
|
|
from PySide import QtCore, QtGui
|
|
|
|
self.widget = QtGui.QWidget()
|
|
self._event_listener = None
|
|
self._build_ui()
|
|
self._refresh_status()
|
|
|
|
self._timer = QtCore.QTimer(self.widget)
|
|
self._timer.timeout.connect(self._refresh_status)
|
|
self._timer.start(30000)
|
|
|
|
# -- UI construction ----------------------------------------------------
|
|
|
|
def _build_ui(self):
|
|
from PySide import QtCore, QtGui
|
|
|
|
layout = QtGui.QVBoxLayout(self.widget)
|
|
layout.setContentsMargins(8, 8, 8, 8)
|
|
layout.setSpacing(6)
|
|
|
|
# Status row
|
|
status_row = QtGui.QHBoxLayout()
|
|
status_row.setSpacing(6)
|
|
self._status_dot = QtGui.QLabel("\u2b24")
|
|
self._status_dot.setFixedWidth(16)
|
|
self._status_dot.setAlignment(QtCore.Qt.AlignCenter)
|
|
self._status_label = QtGui.QLabel("Checking...")
|
|
status_row.addWidget(self._status_dot)
|
|
status_row.addWidget(self._status_label)
|
|
status_row.addStretch()
|
|
layout.addLayout(status_row)
|
|
|
|
# User row
|
|
user_row = QtGui.QHBoxLayout()
|
|
user_row.setSpacing(6)
|
|
user_lbl = QtGui.QLabel("User:")
|
|
user_lbl.setStyleSheet("color: #888;")
|
|
self._user_label = QtGui.QLabel("(not logged in)")
|
|
user_row.addWidget(user_lbl)
|
|
user_row.addWidget(self._user_label)
|
|
user_row.addStretch()
|
|
layout.addLayout(user_row)
|
|
|
|
# Role row
|
|
role_row = QtGui.QHBoxLayout()
|
|
role_row.setSpacing(6)
|
|
role_lbl = QtGui.QLabel("Role:")
|
|
role_lbl.setStyleSheet("color: #888;")
|
|
self._role_label = QtGui.QLabel("")
|
|
self._role_label.setStyleSheet("font-size: 11px;")
|
|
role_row.addWidget(role_lbl)
|
|
role_row.addWidget(self._role_label)
|
|
role_row.addStretch()
|
|
layout.addLayout(role_row)
|
|
|
|
layout.addSpacing(4)
|
|
|
|
# URL row (compact display)
|
|
url_row = QtGui.QHBoxLayout()
|
|
url_row.setSpacing(6)
|
|
url_lbl = QtGui.QLabel("URL:")
|
|
url_lbl.setStyleSheet("color: #888;")
|
|
self._url_label = QtGui.QLabel("")
|
|
self._url_label.setStyleSheet("font-size: 11px;")
|
|
self._url_label.setWordWrap(True)
|
|
url_row.addWidget(url_lbl)
|
|
url_row.addWidget(self._url_label, 1)
|
|
layout.addLayout(url_row)
|
|
|
|
layout.addSpacing(4)
|
|
|
|
# Live updates row
|
|
sse_row = QtGui.QHBoxLayout()
|
|
sse_row.setSpacing(6)
|
|
sse_lbl = QtGui.QLabel("Live:")
|
|
sse_lbl.setStyleSheet("color: #888;")
|
|
self._sse_label = QtGui.QLabel("")
|
|
self._sse_label.setStyleSheet("font-size: 11px;")
|
|
sse_row.addWidget(sse_lbl)
|
|
sse_row.addWidget(self._sse_label)
|
|
sse_row.addStretch()
|
|
layout.addLayout(sse_row)
|
|
|
|
# Server mode banner (hidden when normal)
|
|
self._mode_banner = QtGui.QLabel("")
|
|
self._mode_banner.setWordWrap(True)
|
|
self._mode_banner.setContentsMargins(6, 4, 6, 4)
|
|
self._mode_banner.setVisible(False)
|
|
layout.addWidget(self._mode_banner)
|
|
|
|
layout.addSpacing(4)
|
|
|
|
# Buttons
|
|
btn_row = QtGui.QHBoxLayout()
|
|
btn_row.setSpacing(6)
|
|
|
|
self._login_btn = QtGui.QPushButton("Login")
|
|
self._login_btn.clicked.connect(self._on_login_clicked)
|
|
btn_row.addWidget(self._login_btn)
|
|
|
|
settings_btn = QtGui.QToolButton()
|
|
settings_btn.setText("\u2699")
|
|
settings_btn.setToolTip("Silo Settings")
|
|
settings_btn.setFixedSize(28, 28)
|
|
settings_btn.clicked.connect(self._on_settings_clicked)
|
|
btn_row.addStretch()
|
|
btn_row.addWidget(settings_btn)
|
|
|
|
layout.addLayout(btn_row)
|
|
layout.addStretch()
|
|
|
|
# -- Status refresh -----------------------------------------------------
|
|
|
|
def _refresh_status(self):
|
|
from PySide import QtGui
|
|
|
|
# Update URL display
|
|
self._url_label.setText(_get_api_url())
|
|
|
|
has_token = _client.is_authenticated()
|
|
username = _get_auth_username()
|
|
role = _get_auth_role()
|
|
source = _get_auth_source()
|
|
|
|
# Check server connectivity
|
|
try:
|
|
reachable, msg = _client.check_connection()
|
|
except Exception:
|
|
reachable = False
|
|
|
|
# If reachable and we have a token, validate it against the server
|
|
authed = False
|
|
if reachable and has_token:
|
|
user = _client.get_current_user()
|
|
if user and user.get("username"):
|
|
authed = True
|
|
username = user["username"]
|
|
role = user.get("role", "")
|
|
source = user.get("auth_source", "")
|
|
_save_auth_info(username=username, role=role, source=source)
|
|
|
|
if authed:
|
|
self._user_label.setText(username)
|
|
role_text = role or ""
|
|
if source:
|
|
role_text += f" ({source})" if role_text else source
|
|
self._role_label.setText(role_text)
|
|
else:
|
|
self._user_label.setText("(not logged in)")
|
|
self._role_label.setText("")
|
|
|
|
# Update button state
|
|
try:
|
|
self._login_btn.clicked.disconnect()
|
|
except RuntimeError:
|
|
pass
|
|
|
|
if reachable and authed:
|
|
self._status_dot.setStyleSheet("color: #4CAF50; font-size: 10px;")
|
|
self._status_label.setText("Connected")
|
|
self._login_btn.setText("Logout")
|
|
self._login_btn.clicked.connect(self._on_logout_clicked)
|
|
elif reachable and has_token and not authed:
|
|
# Token exists but is invalid/expired
|
|
self._status_dot.setStyleSheet("color: #FF9800; font-size: 10px;")
|
|
self._status_label.setText("Token invalid")
|
|
self._login_btn.setText("Login")
|
|
self._login_btn.clicked.connect(self._on_login_clicked)
|
|
elif reachable and not has_token:
|
|
self._status_dot.setStyleSheet("color: #FFC107; font-size: 10px;")
|
|
self._status_label.setText("Connected (no auth)")
|
|
self._login_btn.setText("Login")
|
|
self._login_btn.clicked.connect(self._on_login_clicked)
|
|
else:
|
|
self._status_dot.setStyleSheet("color: #F44336; font-size: 10px;")
|
|
self._status_label.setText("Disconnected")
|
|
self._login_btn.setText("Login")
|
|
self._login_btn.clicked.connect(self._on_login_clicked)
|
|
|
|
# Fetch and display server mode
|
|
global _server_mode
|
|
if reachable:
|
|
_server_mode = _fetch_server_mode()
|
|
else:
|
|
_server_mode = "offline"
|
|
self._update_mode_banner()
|
|
|
|
# Manage SSE listener based on auth state
|
|
self._sync_event_listener(authed)
|
|
|
|
# -- SSE listener management --------------------------------------------
|
|
|
|
def _sync_event_listener(self, authed):
|
|
"""Start or stop the SSE listener depending on authentication state."""
|
|
if authed:
|
|
if self._event_listener is None or not self._event_listener.isRunning():
|
|
self._event_listener = SiloEventListener()
|
|
self._event_listener.item_updated.connect(self._on_remote_change)
|
|
self._event_listener.revision_created.connect(self._on_remote_revision)
|
|
self._event_listener.connection_status.connect(self._on_sse_status)
|
|
self._event_listener.server_mode_changed.connect(self._on_server_mode)
|
|
self._event_listener.start()
|
|
else:
|
|
if self._event_listener is not None and self._event_listener.isRunning():
|
|
self._event_listener.stop()
|
|
self._sse_label.setText("")
|
|
|
|
def _on_sse_status(self, status, retry, error):
|
|
if status == "connected":
|
|
self._sse_label.setText("Listening")
|
|
self._sse_label.setStyleSheet("font-size: 11px; color: #4CAF50;")
|
|
self._sse_label.setToolTip("")
|
|
FreeCAD.Console.PrintMessage("Silo: SSE connected\n")
|
|
elif status == "disconnected":
|
|
self._sse_label.setText(f"Reconnecting ({retry}/{SiloEventListener._MAX_RETRIES})...")
|
|
self._sse_label.setStyleSheet("font-size: 11px; color: #FF9800;")
|
|
self._sse_label.setToolTip(error or "Connection lost")
|
|
FreeCAD.Console.PrintWarning(
|
|
f"Silo: SSE reconnecting ({retry}/{SiloEventListener._MAX_RETRIES}): {error}\n"
|
|
)
|
|
elif status == "gave_up":
|
|
self._sse_label.setText("Disconnected")
|
|
self._sse_label.setStyleSheet("font-size: 11px; color: #F44336;")
|
|
self._sse_label.setToolTip(error or "Max retries reached")
|
|
FreeCAD.Console.PrintError(f"Silo: SSE gave up after {retry} retries: {error}\n")
|
|
elif status == "unsupported":
|
|
self._sse_label.setText("Not available")
|
|
self._sse_label.setStyleSheet("font-size: 11px; color: #888;")
|
|
|
|
def _on_server_mode(self, mode):
|
|
global _server_mode
|
|
_server_mode = mode
|
|
self._update_mode_banner()
|
|
|
|
def _update_mode_banner(self):
|
|
_MODE_BANNERS = {
|
|
"normal": ("", "", False),
|
|
"read-only": (
|
|
"Server is in read-only mode",
|
|
"background: #FFC107; color: #000; padding: 4px; font-size: 11px;",
|
|
True,
|
|
),
|
|
"degraded": (
|
|
"MinIO unavailable \u2014 file ops limited",
|
|
"background: #FF9800; color: #000; padding: 4px; font-size: 11px;",
|
|
True,
|
|
),
|
|
"offline": (
|
|
"Disconnected from silo",
|
|
"background: #F44336; color: #fff; padding: 4px; font-size: 11px;",
|
|
True,
|
|
),
|
|
}
|
|
text, style, visible = _MODE_BANNERS.get(_server_mode, _MODE_BANNERS["offline"])
|
|
self._mode_banner.setText(text)
|
|
self._mode_banner.setStyleSheet(style)
|
|
self._mode_banner.setVisible(visible)
|
|
|
|
def _on_remote_change(self, part_number):
|
|
FreeCAD.Console.PrintMessage(f"Silo: Part {part_number} updated on server\n")
|
|
mw = FreeCADGui.getMainWindow()
|
|
if mw is not None:
|
|
mw.statusBar().showMessage(f"Silo: {part_number} updated on server", 5000)
|
|
self._refresh_activity_panel()
|
|
|
|
def _on_remote_revision(self, part_number, revision):
|
|
FreeCAD.Console.PrintMessage(f"Silo: New revision {revision} for {part_number}\n")
|
|
mw = FreeCADGui.getMainWindow()
|
|
if mw is not None:
|
|
mw.statusBar().showMessage(f"Silo: {part_number} rev {revision} available", 5000)
|
|
self._refresh_activity_panel()
|
|
|
|
def _refresh_activity_panel(self):
|
|
"""Refresh the Database Activity panel if it exists."""
|
|
from PySide import QtCore, QtGui, QtWidgets
|
|
|
|
mw = FreeCADGui.getMainWindow()
|
|
if mw is None:
|
|
return
|
|
panel = mw.findChild(QtWidgets.QDockWidget, "SiloDatabaseActivity")
|
|
if panel is None:
|
|
return
|
|
activity_list = panel.findChild(QtWidgets.QListWidget)
|
|
if activity_list is None:
|
|
return
|
|
activity_list.clear()
|
|
|
|
# Connect interaction signals (once)
|
|
if not getattr(activity_list, "_silo_connected", False):
|
|
activity_list.itemDoubleClicked.connect(self._on_activity_double_click)
|
|
activity_list.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
|
|
activity_list.customContextMenuRequested.connect(
|
|
lambda pos: self._on_activity_context_menu(activity_list, pos)
|
|
)
|
|
activity_list._silo_connected = True
|
|
|
|
# Collect local part numbers for badge
|
|
local_pns = set()
|
|
try:
|
|
for lf in search_local_files():
|
|
local_pns.add(lf.get("part_number", ""))
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
items = _client.list_items()
|
|
if isinstance(items, list):
|
|
for item in items[:20]:
|
|
pn = item.get("part_number", "")
|
|
desc = item.get("description", "")
|
|
updated = item.get("updated_at", "")
|
|
if updated:
|
|
updated = updated[:10]
|
|
|
|
# Fetch latest revision info
|
|
rev_num = ""
|
|
comment = ""
|
|
try:
|
|
revs = _client.get_revisions(pn)
|
|
if revs:
|
|
latest = revs[0] if isinstance(revs, list) else revs
|
|
rev_num = str(latest.get("revision_number", ""))
|
|
comment = latest.get("comment", "") or ""
|
|
except Exception:
|
|
pass
|
|
|
|
# Truncate long descriptions
|
|
desc_display = desc
|
|
if len(desc_display) > 40:
|
|
desc_display = desc_display[:37] + "..."
|
|
|
|
# Build display text
|
|
rev_part = f" \u2013 Rev {rev_num}" if rev_num else ""
|
|
date_part = f" \u2013 {updated}" if updated else ""
|
|
local_badge = " \u25cf local" if pn in local_pns else ""
|
|
line1 = f"{pn} \u2013 {desc_display}{rev_part}{date_part}{local_badge}"
|
|
|
|
if comment:
|
|
line1 += f'\n "{comment}"'
|
|
else:
|
|
line1 += "\n (no comment)"
|
|
|
|
list_item = QtWidgets.QListWidgetItem(line1)
|
|
list_item.setData(QtCore.Qt.UserRole, pn)
|
|
if desc and len(desc) > 40:
|
|
list_item.setToolTip(desc)
|
|
if pn in local_pns:
|
|
list_item.setForeground(QtGui.QColor("#4CAF50"))
|
|
activity_list.addItem(list_item)
|
|
|
|
if activity_list.count() == 0:
|
|
activity_list.addItem("(No items in database)")
|
|
except Exception:
|
|
activity_list.addItem("(Unable to refresh activity)")
|
|
|
|
def _on_activity_double_click(self, item):
|
|
"""Open/checkout item from activity pane."""
|
|
pn = item.data(256) # Qt.UserRole
|
|
if not pn:
|
|
return
|
|
local_path = find_file_by_part_number(pn)
|
|
if local_path and local_path.exists():
|
|
FreeCAD.openDocument(str(local_path))
|
|
else:
|
|
_sync.open_item(pn)
|
|
|
|
def _on_activity_context_menu(self, activity_list, pos):
|
|
"""Show context menu for activity pane items."""
|
|
from PySide import QtCore, QtGui
|
|
|
|
item = activity_list.itemAt(pos)
|
|
if item is None:
|
|
return
|
|
pn = item.data(QtCore.Qt.UserRole)
|
|
if not pn:
|
|
return
|
|
|
|
menu = QtGui.QMenu()
|
|
open_action = menu.addAction("Open in Create")
|
|
browser_action = menu.addAction("Open in Browser")
|
|
copy_action = menu.addAction("Copy Part Number")
|
|
revisions_action = menu.addAction("View Revisions")
|
|
|
|
action = menu.exec_(activity_list.mapToGlobal(pos))
|
|
if action == open_action:
|
|
local_path = find_file_by_part_number(pn)
|
|
if local_path and local_path.exists():
|
|
FreeCAD.openDocument(str(local_path))
|
|
else:
|
|
_sync.open_item(pn)
|
|
elif action == browser_action:
|
|
import webbrowser
|
|
|
|
api_url = _get_api_url().rstrip("/")
|
|
base_url = api_url[:-4] if api_url.endswith("/api") else api_url
|
|
webbrowser.open(f"{base_url}/items/{pn}")
|
|
elif action == copy_action:
|
|
QtGui.QApplication.clipboard().setText(pn)
|
|
elif action == revisions_action:
|
|
FreeCADGui.runCommand("Silo_Info", 0)
|
|
|
|
# -- Actions ------------------------------------------------------------
|
|
|
|
def _on_login_clicked(self):
|
|
self._show_login_dialog()
|
|
|
|
def _on_logout_clicked(self):
|
|
_client.logout()
|
|
FreeCAD.Console.PrintMessage("Silo: Logged out\n")
|
|
self._refresh_status()
|
|
|
|
def _on_settings_clicked(self):
|
|
FreeCADGui.runCommand("Silo_Settings")
|
|
# Refresh after settings may have changed
|
|
self._refresh_status()
|
|
|
|
def _show_login_dialog(self):
|
|
from PySide import QtCore, QtGui
|
|
|
|
dialog = QtGui.QDialog(self.widget)
|
|
dialog.setWindowTitle("Silo Login")
|
|
dialog.setMinimumWidth(380)
|
|
|
|
layout = QtGui.QVBoxLayout(dialog)
|
|
|
|
# Server info
|
|
server_label = QtGui.QLabel(f"Server: {_get_api_url()}")
|
|
server_label.setStyleSheet("color: #888; font-size: 11px;")
|
|
layout.addWidget(server_label)
|
|
|
|
layout.addSpacing(4)
|
|
|
|
info_label = QtGui.QLabel(
|
|
"Enter your credentials to create a persistent API token. "
|
|
"Supports local accounts and LDAP (FreeIPA)."
|
|
)
|
|
info_label.setWordWrap(True)
|
|
info_label.setStyleSheet("color: #888; font-size: 11px;")
|
|
layout.addWidget(info_label)
|
|
|
|
layout.addSpacing(8)
|
|
|
|
# Username
|
|
user_label = QtGui.QLabel("Username:")
|
|
layout.addWidget(user_label)
|
|
user_input = QtGui.QLineEdit()
|
|
user_input.setPlaceholderText("Username")
|
|
last_user = _get_auth_username()
|
|
if last_user:
|
|
user_input.setText(last_user)
|
|
layout.addWidget(user_input)
|
|
|
|
layout.addSpacing(4)
|
|
|
|
# Password
|
|
pass_label = QtGui.QLabel("Password:")
|
|
layout.addWidget(pass_label)
|
|
pass_input = QtGui.QLineEdit()
|
|
pass_input.setEchoMode(QtGui.QLineEdit.Password)
|
|
pass_input.setPlaceholderText("Password")
|
|
layout.addWidget(pass_input)
|
|
|
|
layout.addSpacing(4)
|
|
|
|
# Error / status label
|
|
status_label = QtGui.QLabel("")
|
|
status_label.setWordWrap(True)
|
|
status_label.setVisible(False)
|
|
layout.addWidget(status_label)
|
|
|
|
layout.addSpacing(8)
|
|
|
|
# Buttons
|
|
btn_layout = QtGui.QHBoxLayout()
|
|
login_btn = QtGui.QPushButton("Login")
|
|
cancel_btn = QtGui.QPushButton("Cancel")
|
|
btn_layout.addStretch()
|
|
btn_layout.addWidget(login_btn)
|
|
btn_layout.addWidget(cancel_btn)
|
|
layout.addLayout(btn_layout)
|
|
|
|
def on_login():
|
|
username = user_input.text().strip()
|
|
password = pass_input.text()
|
|
if not username or not password:
|
|
status_label.setText("Username and password are required.")
|
|
status_label.setStyleSheet("color: #F44336;")
|
|
status_label.setVisible(True)
|
|
return
|
|
# Disable inputs during login
|
|
login_btn.setEnabled(False)
|
|
status_label.setText("Logging in...")
|
|
status_label.setStyleSheet("color: #888;")
|
|
status_label.setVisible(True)
|
|
# Process events so the user sees the status update
|
|
from PySide.QtWidgets import QApplication
|
|
|
|
QApplication.processEvents()
|
|
try:
|
|
result = _client.login(username, password)
|
|
role = result.get("role", "")
|
|
source = result.get("auth_source", "")
|
|
msg = f"Silo: Logged in as {username}"
|
|
if role:
|
|
msg += f" ({role})"
|
|
if source:
|
|
msg += f" via {source}"
|
|
FreeCAD.Console.PrintMessage(msg + "\n")
|
|
dialog.accept()
|
|
except RuntimeError as e:
|
|
status_label.setText(str(e))
|
|
status_label.setStyleSheet("color: #F44336;")
|
|
status_label.setVisible(True)
|
|
login_btn.setEnabled(True)
|
|
|
|
login_btn.clicked.connect(on_login)
|
|
cancel_btn.clicked.connect(dialog.reject)
|
|
pass_input.returnPressed.connect(on_login)
|
|
user_input.returnPressed.connect(lambda: pass_input.setFocus())
|
|
|
|
dialog.exec_()
|
|
self._refresh_status()
|
|
|
|
|
|
class Silo_Auth:
|
|
"""Show the Silo authentication panel."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "Authentication",
|
|
"ToolTip": "Show Silo authentication status and login",
|
|
"Pixmap": _icon("auth"),
|
|
}
|
|
|
|
def Activated(self):
|
|
from PySide import QtGui
|
|
|
|
mw = FreeCADGui.getMainWindow()
|
|
if mw is None:
|
|
return
|
|
panel = mw.findChild(QtGui.QDockWidget, "SiloDatabaseAuth")
|
|
if panel:
|
|
panel.show()
|
|
panel.raise_()
|
|
|
|
def IsActive(self):
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Start panel
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class SiloStartPanel:
|
|
"""Content widget for the Silo Start Panel dock."""
|
|
|
|
def __init__(self):
|
|
from PySide import QtCore, QtGui
|
|
|
|
self.widget = QtGui.QWidget()
|
|
self._build_ui()
|
|
self._refresh()
|
|
|
|
self._timer = QtCore.QTimer(self.widget)
|
|
self._timer.timeout.connect(self._refresh)
|
|
self._timer.start(60000) # Refresh every 60 seconds
|
|
|
|
def _build_ui(self):
|
|
from PySide import QtCore, QtGui
|
|
|
|
layout = QtGui.QVBoxLayout(self.widget)
|
|
layout.setContentsMargins(8, 8, 8, 8)
|
|
layout.setSpacing(6)
|
|
|
|
# Connection status badge
|
|
status_row = QtGui.QHBoxLayout()
|
|
status_row.setSpacing(6)
|
|
self._conn_dot = QtGui.QLabel("\u2b24")
|
|
self._conn_dot.setFixedWidth(16)
|
|
self._conn_dot.setAlignment(QtCore.Qt.AlignCenter)
|
|
self._conn_label = QtGui.QLabel("Checking...")
|
|
self._conn_label.setStyleSheet("font-weight: bold;")
|
|
status_row.addWidget(self._conn_dot)
|
|
status_row.addWidget(self._conn_label)
|
|
status_row.addStretch()
|
|
layout.addLayout(status_row)
|
|
|
|
layout.addSpacing(8)
|
|
|
|
# My Checkouts section
|
|
checkout_header = QtGui.QLabel("My Checkouts")
|
|
checkout_header.setStyleSheet("font-weight: bold; font-size: 12px;")
|
|
layout.addWidget(checkout_header)
|
|
|
|
self._checkout_list = QtGui.QListWidget()
|
|
self._checkout_list.setMaximumHeight(160)
|
|
self._checkout_list.setAlternatingRowColors(True)
|
|
self._checkout_list.itemDoubleClicked.connect(self._on_checkout_clicked)
|
|
layout.addWidget(self._checkout_list)
|
|
|
|
layout.addSpacing(8)
|
|
|
|
# Recent Silo Activity section
|
|
activity_header = QtGui.QLabel("Recent Silo Activity")
|
|
activity_header.setStyleSheet("font-weight: bold; font-size: 12px;")
|
|
layout.addWidget(activity_header)
|
|
|
|
self._activity_list = QtGui.QListWidget()
|
|
self._activity_list.setMaximumHeight(200)
|
|
self._activity_list.setAlternatingRowColors(True)
|
|
self._activity_list.itemDoubleClicked.connect(self._on_activity_clicked)
|
|
layout.addWidget(self._activity_list)
|
|
|
|
layout.addSpacing(8)
|
|
|
|
# Local Recent Files section
|
|
local_header = QtGui.QLabel("Local Recent Files")
|
|
local_header.setStyleSheet("font-weight: bold; font-size: 12px;")
|
|
layout.addWidget(local_header)
|
|
|
|
self._local_list = QtGui.QListWidget()
|
|
self._local_list.setMaximumHeight(160)
|
|
self._local_list.setAlternatingRowColors(True)
|
|
self._local_list.itemDoubleClicked.connect(self._on_local_clicked)
|
|
layout.addWidget(self._local_list)
|
|
|
|
layout.addStretch()
|
|
|
|
def _refresh(self):
|
|
self._refresh_connection()
|
|
self._refresh_checkouts()
|
|
self._refresh_activity()
|
|
self._refresh_local()
|
|
|
|
def _refresh_connection(self):
|
|
try:
|
|
reachable, _ = _client.check_connection()
|
|
except Exception:
|
|
reachable = False
|
|
|
|
if reachable:
|
|
api_url = _get_api_url()
|
|
parsed = urllib.parse.urlparse(api_url)
|
|
hostname = parsed.hostname or api_url
|
|
self._conn_dot.setStyleSheet("color: #4CAF50; font-size: 10px;")
|
|
self._conn_label.setText(f"Connected to {hostname}")
|
|
else:
|
|
self._conn_dot.setStyleSheet("color: #888; font-size: 10px;")
|
|
self._conn_label.setText("Disconnected")
|
|
|
|
def _refresh_checkouts(self):
|
|
self._checkout_list.clear()
|
|
try:
|
|
local_files = search_local_files()
|
|
for item in local_files[:15]:
|
|
pn = item.get("part_number", "")
|
|
desc = item.get("description", "")
|
|
modified = (item.get("modified") or "")[:10]
|
|
label = f"{pn} {desc}"
|
|
if modified:
|
|
label += f" ({modified})"
|
|
list_item = self._checkout_list.addItem(label)
|
|
except Exception:
|
|
self._checkout_list.addItem("(Unable to scan local files)")
|
|
|
|
if self._checkout_list.count() == 0:
|
|
self._checkout_list.addItem("(No local checkouts)")
|
|
|
|
def _refresh_activity(self):
|
|
self._activity_list.clear()
|
|
try:
|
|
reachable, _ = _client.check_connection()
|
|
except Exception:
|
|
reachable = False
|
|
|
|
if not reachable:
|
|
self._activity_list.addItem("(Not connected)")
|
|
return
|
|
|
|
try:
|
|
items = _client.list_items()
|
|
if isinstance(items, list):
|
|
# Collect local part numbers for badge comparison
|
|
local_pns = set()
|
|
try:
|
|
for lf in search_local_files():
|
|
local_pns.add(lf.get("part_number", ""))
|
|
except Exception:
|
|
pass
|
|
|
|
for item in items[:10]:
|
|
pn = item.get("part_number", "")
|
|
desc = item.get("description", "")
|
|
updated = (item.get("updated_at") or "")[:10]
|
|
badge = "\u2713 " if pn in local_pns else ""
|
|
label = f"{badge}{pn} {desc}"
|
|
if updated:
|
|
label += f" ({updated})"
|
|
self._activity_list.addItem(label)
|
|
|
|
if self._activity_list.count() == 0:
|
|
self._activity_list.addItem("(No items in database)")
|
|
except Exception:
|
|
self._activity_list.addItem("(Unable to fetch activity)")
|
|
|
|
def _refresh_local(self):
|
|
from PySide import QtGui
|
|
|
|
self._local_list.clear()
|
|
try:
|
|
param = FreeCAD.ParamGet("User parameter:BaseApp/RecentFiles")
|
|
count = param.GetInt("RecentFiles", 0)
|
|
for i in range(min(count, 10)):
|
|
path = param.GetString(f"MRU{i}", "")
|
|
if path:
|
|
name = Path(path).name
|
|
item = QtGui.QListWidgetItem(name)
|
|
item.setToolTip(path)
|
|
item.setData(256, path) # Qt.UserRole = 256
|
|
self._local_list.addItem(item)
|
|
except Exception:
|
|
pass
|
|
|
|
if self._local_list.count() == 0:
|
|
self._local_list.addItem("(No recent files)")
|
|
|
|
def _on_checkout_clicked(self, item):
|
|
"""Open a local checkout file."""
|
|
text = item.text()
|
|
if text.startswith("("):
|
|
return
|
|
pn = text.split()[0] if text else ""
|
|
if not pn:
|
|
return
|
|
local_path = find_file_by_part_number(pn)
|
|
if local_path and local_path.exists():
|
|
FreeCAD.openDocument(str(local_path))
|
|
|
|
def _on_activity_clicked(self, item):
|
|
"""Open/checkout a remote item."""
|
|
text = item.text()
|
|
if text.startswith("("):
|
|
return
|
|
# Strip badge if present
|
|
text = text.lstrip("\u2713 ")
|
|
pn = text.split()[0] if text else ""
|
|
if not pn:
|
|
return
|
|
local_path = find_file_by_part_number(pn)
|
|
if local_path and local_path.exists():
|
|
FreeCAD.openDocument(str(local_path))
|
|
else:
|
|
_sync.open_item(pn)
|
|
|
|
def _on_local_clicked(self, item):
|
|
"""Open a local recent file."""
|
|
path = item.data(256) # Qt.UserRole
|
|
if path and Path(path).exists():
|
|
FreeCAD.openDocument(path)
|
|
|
|
|
|
class Silo_StartPanel:
|
|
"""Show the Silo Start Panel."""
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "Start Panel",
|
|
"ToolTip": "Show Silo start panel with checkouts and recent activity",
|
|
"Pixmap": _icon("open"),
|
|
}
|
|
|
|
def Activated(self):
|
|
from PySide import QtCore, QtGui
|
|
|
|
mw = FreeCADGui.getMainWindow()
|
|
if mw is None:
|
|
return
|
|
|
|
# Reuse existing panel if it exists
|
|
panel = mw.findChild(QtGui.QDockWidget, "SiloStartPanel")
|
|
if panel:
|
|
panel.show()
|
|
panel.raise_()
|
|
return
|
|
|
|
# Create new dock widget
|
|
content = SiloStartPanel()
|
|
dock = QtGui.QDockWidget("Silo", mw)
|
|
dock.setObjectName("SiloStartPanel")
|
|
dock.setWidget(content.widget)
|
|
dock.setAllowedAreas(QtCore.Qt.LeftDockWidgetArea | QtCore.Qt.RightDockWidgetArea)
|
|
mw.addDockWidget(QtCore.Qt.LeftDockWidgetArea, dock)
|
|
|
|
def IsActive(self):
|
|
return True
|
|
|
|
|
|
class _DiagWorker(QtCore.QThread):
|
|
"""Background worker that runs connectivity diagnostics."""
|
|
|
|
result = QtCore.Signal(str, bool, str) # (test_name, passed, detail)
|
|
finished_all = QtCore.Signal()
|
|
_HTTP_TIMEOUT = 10
|
|
_DNS_TIMEOUT = 5
|
|
|
|
def run(self):
|
|
api_url = _get_api_url()
|
|
base_url = api_url.rstrip("/")
|
|
if base_url.endswith("/api"):
|
|
base_url = base_url[:-4]
|
|
self._test_dns(base_url)
|
|
self._test_http("Health", f"{base_url}/health", auth=False)
|
|
self._test_http("Ready", f"{base_url}/ready", auth=False)
|
|
self._test_http("Auth", f"{api_url.rstrip('/')}/auth/me", auth=True)
|
|
self._test_sse(api_url)
|
|
self.finished_all.emit()
|
|
|
|
def _test_dns(self, base_url):
|
|
parsed = urllib.parse.urlparse(base_url)
|
|
hostname = parsed.hostname
|
|
if not hostname:
|
|
self.result.emit("DNS", False, "no hostname in URL")
|
|
return
|
|
try:
|
|
addrs = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
|
|
first_ip = addrs[0][4][0] if addrs else "?"
|
|
self.result.emit("DNS", True, f"{hostname} -> {first_ip}")
|
|
except socket.gaierror as e:
|
|
self.result.emit("DNS", False, f"{hostname}: {e}")
|
|
except Exception as e:
|
|
self.result.emit("DNS", False, str(e))
|
|
|
|
def _test_http(self, name, url, auth=False):
|
|
try:
|
|
headers = {}
|
|
if auth:
|
|
headers.update(_get_auth_headers())
|
|
req = urllib.request.Request(url, headers=headers, method="GET")
|
|
resp = urllib.request.urlopen(
|
|
req, context=_get_ssl_context(), timeout=self._HTTP_TIMEOUT
|
|
)
|
|
code = resp.getcode()
|
|
body = resp.read(4096).decode("utf-8", errors="replace").strip()
|
|
detail = f"HTTP {code}"
|
|
try:
|
|
data = json.loads(body)
|
|
if isinstance(data, dict):
|
|
parts = []
|
|
for key in ("status", "username", "role"):
|
|
if key in data:
|
|
parts.append(f"{key}={data[key]}")
|
|
if parts:
|
|
detail += f" ({', '.join(parts)})"
|
|
except (json.JSONDecodeError, ValueError):
|
|
if len(body) <= 80:
|
|
detail += f" {body}"
|
|
self.result.emit(name, code < 400, detail)
|
|
except urllib.error.HTTPError as e:
|
|
self.result.emit(name, False, f"HTTP {e.code} {e.reason}")
|
|
except urllib.error.URLError as e:
|
|
self.result.emit(name, False, str(e.reason))
|
|
except Exception as e:
|
|
self.result.emit(name, False, str(e))
|
|
|
|
def _test_sse(self, api_url):
|
|
url = f"{api_url.rstrip('/')}/events"
|
|
try:
|
|
headers = {"Accept": "text/event-stream"}
|
|
headers.update(_get_auth_headers())
|
|
req = urllib.request.Request(url, headers=headers, method="GET")
|
|
resp = urllib.request.urlopen(
|
|
req, context=_get_ssl_context(), timeout=self._HTTP_TIMEOUT
|
|
)
|
|
content_type = resp.headers.get("Content-Type", "")
|
|
code = resp.getcode()
|
|
resp.close()
|
|
if "text/event-stream" in content_type:
|
|
self.result.emit("SSE", True, f"HTTP {code} event-stream")
|
|
else:
|
|
self.result.emit("SSE", True, f"HTTP {code} (type: {content_type})")
|
|
except urllib.error.HTTPError as e:
|
|
if e.code in (404, 501):
|
|
self.result.emit("SSE", False, f"HTTP {e.code} (not supported)")
|
|
else:
|
|
self.result.emit("SSE", False, f"HTTP {e.code} {e.reason}")
|
|
except urllib.error.URLError as e:
|
|
self.result.emit("SSE", False, str(e.reason))
|
|
except Exception as e:
|
|
self.result.emit("SSE", False, str(e))
|
|
|
|
|
|
class Silo_Diag:
|
|
"""Command to run connection diagnostics."""
|
|
|
|
_worker = None
|
|
|
|
def GetResources(self):
|
|
return {
|
|
"MenuText": "Diagnostics",
|
|
"ToolTip": "Test DNS, health, readiness, auth, and SSE connectivity",
|
|
"Pixmap": _icon("info"),
|
|
}
|
|
|
|
def Activated(self):
|
|
if self._worker is not None and self._worker.isRunning():
|
|
FreeCAD.Console.PrintWarning("Silo: diagnostics already running\n")
|
|
return
|
|
api_url = _get_api_url()
|
|
FreeCAD.Console.PrintMessage(f"--- Silo Diagnostics ({api_url}) ---\n")
|
|
self._worker = _DiagWorker()
|
|
self._worker.result.connect(self._on_result)
|
|
self._worker.finished_all.connect(self._on_finished)
|
|
self._worker.start()
|
|
|
|
def _on_result(self, name, passed, detail):
|
|
if passed:
|
|
FreeCAD.Console.PrintMessage(f" PASS {name}: {detail}\n")
|
|
else:
|
|
FreeCAD.Console.PrintError(f" FAIL {name}: {detail}\n")
|
|
|
|
def _on_finished(self):
|
|
FreeCAD.Console.PrintMessage("--- Diagnostics complete ---\n")
|
|
self._worker = None
|
|
|
|
def IsActive(self):
|
|
return True
|
|
|
|
|
|
# Register commands
|
|
FreeCADGui.addCommand("Silo_Open", Silo_Open())
|
|
FreeCADGui.addCommand("Silo_New", Silo_New())
|
|
FreeCADGui.addCommand("Silo_Save", Silo_Save())
|
|
FreeCADGui.addCommand("Silo_Commit", Silo_Commit())
|
|
FreeCADGui.addCommand("Silo_Pull", Silo_Pull())
|
|
FreeCADGui.addCommand("Silo_Push", Silo_Push())
|
|
FreeCADGui.addCommand("Silo_Info", Silo_Info())
|
|
FreeCADGui.addCommand("Silo_BOM", Silo_BOM())
|
|
FreeCADGui.addCommand("Silo_TagProjects", Silo_TagProjects())
|
|
FreeCADGui.addCommand("Silo_Rollback", Silo_Rollback())
|
|
FreeCADGui.addCommand("Silo_SetStatus", Silo_SetStatus())
|
|
FreeCADGui.addCommand("Silo_Settings", Silo_Settings())
|
|
|
|
FreeCADGui.addCommand("Silo_Auth", Silo_Auth())
|
|
FreeCADGui.addCommand("Silo_StartPanel", Silo_StartPanel())
|
|
FreeCADGui.addCommand("Silo_Diag", Silo_Diag())
|