Files
silo/pkg/freecad/silo_commands.py
Forbes 1478514b13 feat(origin): implement SiloOrigin adapter for unified origin system
Implements Issue #11: Silo origin adapter

This commit creates the SiloOrigin class that implements the FileOrigin
interface introduced in Issue #9, enabling Silo to be used as a document
origin in the unified file origin system.

## SiloOrigin Class (silo_origin.py)

New Python module providing the FileOrigin implementation for Silo PLM:

### Identity Methods
- id(): Returns 'silo' as unique identifier
- name(): Returns 'Kindred Silo' for UI display
- nickname(): Returns 'Silo' for compact UI elements
- icon(): Returns 'silo' icon name
- type(): Returns OriginType.PLM (1)

### Workflow Characteristics
- tracksExternally(): True - Silo tracks documents in database
- requiresAuthentication(): True - Silo requires login

### Capabilities
- supportsRevisions(): True
- supportsBOM(): True
- supportsPartNumbers(): True
- supportsAssemblies(): True

### Connection State
- connectionState(): Checks auth status and API connectivity
- connect(): Triggers Silo_Auth dialog if needed
- disconnect(): Calls _client.logout()

### Document Identity (UUID-based tracking)
- documentIdentity(): Returns SiloItemId (UUID) as primary identity
- documentDisplayId(): Returns SiloPartNumber for human display
- ownsDocument(): True if document has SiloItemId or SiloPartNumber

### Core Operations (delegate to existing commands)
- newDocument(): Delegates to Silo_New command
- openDocument(): Uses find_file_by_part_number or _sync.open_item
- saveDocument(): Saves locally + uploads via _client._upload_file
- saveDocumentAs(): Triggers migration workflow for local docs

### Extended Operations
- commitDocument(): Delegates to Silo_Commit
- pullDocument(): Delegates to Silo_Pull
- pushDocument(): Delegates to Silo_Push
- showInfo(): Delegates to Silo_Info
- showBOM(): Delegates to Silo_BOM

### Module Functions
- get_silo_origin(): Returns singleton instance
- register_silo_origin(): Registers with FreeCADGui.addOrigin()
- unregister_silo_origin(): Cleanup function

## UUID Tracking (silo_commands.py)

Added SiloItemId property to all locations where Silo properties are set:

1. create_document_for_item() - Assembly objects (line 1115)
2. create_document_for_item() - Fallback Part objects (line 1131)
3. create_document_for_item() - Part objects (line 1145)
4. Silo_New.Activated() - Tagged existing objects (line 1471)

The SiloItemId stores the database UUID (Item.ID) which is immutable,
while SiloPartNumber remains the human-readable identifier that could
theoretically change.

Property structure on tracked objects:
- SiloItemId: UUID from database (primary tracking key)
- SiloPartNumber: Human-readable part number
- SiloRevision: Current revision number
- SiloItemType: 'part' or 'assembly'

## Workbench Integration (InitGui.py)

SiloOrigin is automatically registered when the Silo workbench
initializes:

    def Initialize(self):
        import silo_commands
        try:
            import silo_origin
            silo_origin.register_silo_origin()
        except Exception as e:
            FreeCAD.Console.PrintWarning(...)

This makes Silo available as a file origin via:
- FreeCADGui.listOrigins() -> includes 'silo'
- FreeCADGui.getOrigin('silo') -> returns origin info dict
- FreeCADGui.setActiveOrigin('silo') -> sets Silo as active

## Design Decisions

1. **Delegation Pattern**: SiloOrigin delegates to existing Silo
   commands rather than reimplementing logic, ensuring consistency
   and easier maintenance.

2. **UUID as Primary Identity**: documentIdentity() returns UUID
   (SiloItemId) for immutable tracking, while documentDisplayId()
   returns part number for user display.

3. **Graceful Fallback**: If SiloItemId is not present (legacy docs),
   falls back to SiloPartNumber for identity/ownership checks.

4. **Exception Handling**: All operations wrapped in try/except to
   prevent origin system failures from breaking FreeCAD.

Refs: #11
2026-02-05 13:29:45 -06:00

3593 lines
126 KiB
Python

"""Silo FreeCAD commands - Streamlined workflow for CAD file management."""
import json
import os
import re
import ssl
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import FreeCAD
import FreeCADGui
from PySide import QtCore
# Preference group for Kindred Silo settings
_PREF_GROUP = "User parameter:BaseApp/Preferences/Mod/KindredSilo"
# Configuration - preferences take priority over env vars
SILO_PROJECTS_DIR = os.environ.get("SILO_PROJECTS_DIR", os.path.expanduser("~/projects"))
def _get_api_url() -> str:
"""Get Silo API URL from preferences, falling back to env var then default."""
param = FreeCAD.ParamGet(_PREF_GROUP)
url = param.GetString("ApiUrl", "")
if not url:
url = os.environ.get("SILO_API_URL", "http://localhost:8080/api")
url = url.rstrip("/")
# Auto-append /api when the user provides just a bare origin with no path,
# e.g. "https://silo.kindred.internal" -> "https://silo.kindred.internal/api"
# but leave URLs that already have a path alone,
# e.g. "http://localhost:8080/api" stays as-is.
if url:
parsed = urllib.parse.urlparse(url)
if not parsed.path or parsed.path == "/":
url = url + "/api"
return url
def _get_api_token() -> str:
"""Get Silo API token from preferences, falling back to env var."""
param = FreeCAD.ParamGet(_PREF_GROUP)
token = param.GetString("ApiToken", "")
if not token:
token = os.environ.get("SILO_API_TOKEN", "")
return token
def _get_ssl_verify() -> bool:
"""Get SSL verification setting from preferences."""
param = FreeCAD.ParamGet(_PREF_GROUP)
return param.GetBool("SslVerify", True)
def _get_ssl_context() -> ssl.SSLContext:
"""Build an SSL context based on the current SSL verification preference."""
if _get_ssl_verify():
ctx = ssl.create_default_context()
# Load custom CA certificate if configured (for internal CAs)
param = FreeCAD.ParamGet(_PREF_GROUP)
custom_cert = param.GetString("SslCertPath", "")
if custom_cert and os.path.isfile(custom_cert):
try:
ctx.load_verify_locations(custom_cert)
except Exception as e:
FreeCAD.Console.PrintWarning(
f"Silo: Failed to load custom cert {custom_cert}: {e}\n"
)
# The bundled Python may not find the system CA store automatically
# (its compiled-in path points to the build environment). Load the
# system CA bundle explicitly so internal CAs (e.g. FreeIPA) are trusted.
for ca_path in (
"/etc/ssl/certs/ca-certificates.crt", # Debian / Ubuntu
"/etc/pki/tls/certs/ca-bundle.crt", # RHEL / CentOS
):
if os.path.isfile(ca_path):
try:
ctx.load_verify_locations(ca_path)
except Exception:
pass
break
return ctx
else:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
def _get_auth_token() -> str:
"""Get the active API token for authenticating requests.
Priority: ApiToken preference > SILO_API_TOKEN env var.
"""
return _get_api_token()
def _get_auth_username() -> str:
"""Get stored authenticated username from preferences."""
param = FreeCAD.ParamGet(_PREF_GROUP)
return param.GetString("AuthUsername", "")
def _get_auth_role() -> str:
"""Get stored authenticated user role from preferences."""
param = FreeCAD.ParamGet(_PREF_GROUP)
return param.GetString("AuthRole", "")
def _get_auth_source() -> str:
"""Get stored authentication source from preferences."""
param = FreeCAD.ParamGet(_PREF_GROUP)
return param.GetString("AuthSource", "")
def _get_auth_headers() -> Dict[str, str]:
"""Return Authorization header dict if a token is configured, else empty."""
token = _get_auth_token()
if token:
return {"Authorization": f"Bearer {token}"}
return {}
def _save_auth_info(username: str, role: str = "", source: str = "", token: str = ""):
"""Store authentication info in preferences."""
param = FreeCAD.ParamGet(_PREF_GROUP)
param.SetString("AuthUsername", username)
param.SetString("AuthRole", role)
param.SetString("AuthSource", source)
if token:
param.SetString("ApiToken", token)
def _clear_auth():
"""Clear stored authentication credentials from preferences."""
param = FreeCAD.ParamGet(_PREF_GROUP)
param.SetString("ApiToken", "")
param.SetString("AuthUsername", "")
param.SetString("AuthRole", "")
param.SetString("AuthSource", "")
# Category name mapping for folder structure
# Format: CCC -> "descriptive_name"
CATEGORY_NAMES = {
# Fasteners
"F01": "screws_bolts",
"F02": "threaded_rods",
"F03": "eyebolts",
"F04": "u_bolts",
"F05": "nuts",
"F06": "washers",
"F07": "shims",
"F08": "inserts",
"F09": "spacers",
"F10": "pins",
"F11": "anchors",
"F12": "nails",
"F13": "rivets",
"F14": "staples",
"F15": "key_stock",
"F16": "retaining_rings",
"F17": "cable_ties",
"F18": "hook_loop",
# Fluid Fittings
"C01": "full_couplings",
"C02": "half_couplings",
"C03": "reducers",
"C04": "elbows",
"C05": "tees",
"C06": "crosses",
"C07": "unions",
"C08": "adapters",
"C09": "plugs_caps",
"C10": "nipples",
"C11": "flanges",
"C12": "valves",
"C13": "quick_disconnects",
"C14": "hose_barbs",
"C15": "compression_fittings",
"C16": "tubing",
"C17": "hoses",
# Motion Components
"R01": "ball_bearings",
"R02": "roller_bearings",
"R03": "sleeve_bearings",
"R04": "thrust_bearings",
"R05": "linear_bearings",
"R06": "spur_gears",
"R07": "helical_gears",
"R08": "bevel_gears",
"R09": "worm_gears",
"R10": "rack_pinion",
"R11": "sprockets",
"R12": "timing_pulleys",
"R13": "v_belt_pulleys",
"R14": "idler_pulleys",
"R15": "wheels",
"R16": "casters",
"R17": "shaft_couplings",
"R18": "clutches",
"R19": "brakes",
"R20": "lead_screws",
"R21": "ball_screws",
"R22": "linear_rails",
"R23": "linear_actuators",
"R24": "brushed_dc_motor",
"R25": "brushless_dc_motor",
"R26": "stepper_motor",
"R27": "servo_motor",
"R28": "ac_induction_motor",
"R29": "gear_motor",
"R30": "motor_driver",
"R31": "motor_controller",
"R32": "encoder",
"R33": "pneumatic_cylinder",
"R34": "pneumatic_actuator",
"R35": "pneumatic_valve",
"R36": "pneumatic_regulator",
"R37": "pneumatic_frl_unit",
"R38": "air_compressor",
"R39": "vacuum_pump",
"R40": "hydraulic_cylinder",
"R41": "hydraulic_pump",
"R42": "hydraulic_motor",
"R43": "hydraulic_valve",
"R44": "hydraulic_accumulator",
# Structural Materials
"S01": "square_tube",
"S02": "round_tube",
"S03": "rectangular_tube",
"S04": "i_beam",
"S05": "t_slot_extrusion",
"S06": "angle",
"S07": "channel",
"S08": "flat_bar",
"S09": "round_bar",
"S10": "square_bar",
"S11": "hex_bar",
"S12": "sheet_metal",
"S13": "plate",
"S14": "expanded_metal",
"S15": "perforated_sheet",
"S16": "wire_mesh",
"S17": "grating",
# Electrical Components
"E01": "wire",
"E02": "cable",
"E03": "connectors",
"E04": "terminals",
"E05": "circuit_breakers",
"E06": "fuses",
"E07": "relays",
"E08": "contactors",
"E09": "switches",
"E10": "buttons",
"E11": "indicators",
"E12": "resistors",
"E13": "capacitors",
"E14": "inductors",
"E15": "transformers",
"E16": "diodes",
"E17": "transistors",
"E18": "ics",
"E19": "microcontrollers",
"E20": "sensors",
"E21": "displays",
"E22": "power_supplies",
"E23": "batteries",
"E24": "pcb",
"E25": "enclosures",
"E26": "heat_sinks",
"E27": "fans",
# Mechanical Components
"M01": "compression_springs",
"M02": "extension_springs",
"M03": "torsion_springs",
"M04": "gas_springs",
"M05": "dampers",
"M06": "shock_absorbers",
"M07": "vibration_mounts",
"M08": "hinges",
"M09": "latches",
"M10": "handles",
"M11": "knobs",
"M12": "levers",
"M13": "linkages",
"M14": "cams",
"M15": "bellows",
"M16": "seals",
"M17": "o_rings",
"M18": "gaskets",
# Tooling and Fixtures
"T01": "jigs",
"T02": "fixtures",
"T03": "molds",
"T04": "dies",
"T05": "gauges",
"T06": "templates",
"T07": "work_holding",
"T08": "test_fixtures",
# Assemblies
"A01": "mechanical_assembly",
"A02": "electrical_assembly",
"A03": "electromechanical_assembly",
"A04": "subassembly",
"A05": "cable_assembly",
"A06": "pneumatic_assembly",
"A07": "hydraulic_assembly",
# Purchased/Off-the-Shelf
"P01": "purchased_mechanical",
"P02": "purchased_electrical",
"P03": "purchased_assembly",
"P04": "raw_material",
"P05": "consumables",
# Custom Fabricated Parts
"X01": "machined_part",
"X02": "sheet_metal_part",
"X03": "3d_printed_part",
"X04": "cast_part",
"X05": "molded_part",
"X06": "welded_fabrication",
"X07": "laser_cut_part",
"X08": "waterjet_cut_part",
}
# Icon directory - resolve relative to this file so it works regardless of install location
_ICON_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resources", "icons")
def _icon(name):
"""Get icon path by name."""
if _ICON_DIR:
path = os.path.join(_ICON_DIR, f"silo-{name}.svg")
if os.path.exists(path):
return path
return ""
def get_projects_dir() -> Path:
"""Get the projects directory."""
projects_dir = Path(SILO_PROJECTS_DIR)
projects_dir.mkdir(parents=True, exist_ok=True)
return projects_dir
class SiloClient:
"""HTTP client for Silo API."""
def __init__(self, base_url: str = None):
self._explicit_url = base_url
@property
def base_url(self) -> str:
if self._explicit_url:
return self._explicit_url.rstrip("/")
return _get_api_url().rstrip("/")
def _request(self, method: str, path: str, data: Optional[Dict] = None) -> Dict[str, Any]:
"""Make HTTP request to Silo API."""
url = f"{self.base_url}{path}"
headers = {"Content-Type": "application/json"}
headers.update(_get_auth_headers())
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, context=_get_ssl_context()) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
if e.code == 401:
_clear_auth()
error_body = e.read().decode()
raise RuntimeError(f"API error {e.code}: {error_body}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _download_file(
self,
part_number: str,
revision: int,
dest_path: str,
progress_callback=None,
) -> bool:
"""Download a file from MinIO storage.
Args:
progress_callback: Optional callable(bytes_downloaded, total_bytes).
total_bytes is -1 if the server did not send Content-Length.
"""
url = f"{self.base_url}/items/{part_number}/file/{revision}"
req = urllib.request.Request(url, headers=_get_auth_headers(), method="GET")
try:
with urllib.request.urlopen(req, context=_get_ssl_context()) as resp:
total = int(resp.headers.get("Content-Length", -1))
downloaded = 0
with open(dest_path, "wb") as f:
while True:
chunk = resp.read(8192)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if progress_callback is not None:
progress_callback(downloaded, total)
return True
except urllib.error.HTTPError as e:
if e.code == 404:
return False
raise RuntimeError(f"Download error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _upload_file(
self, part_number: str, file_path: str, properties: Dict, comment: str = ""
) -> Dict[str, Any]:
"""Upload a file and create a new revision."""
import mimetypes
url = f"{self.base_url}/items/{part_number}/file"
with open(file_path, "rb") as f:
file_data = f.read()
boundary = "----SiloUploadBoundary" + str(hash(file_path))[-8:]
body_parts = []
filename = os.path.basename(file_path)
content_type = mimetypes.guess_type(filename)[0] or "application/octet-stream"
body_parts.append(
f'--{boundary}\r\nContent-Disposition: form-data; name="file"; filename="{filename}"\r\nContent-Type: {content_type}\r\n\r\n'
)
body_parts.append(file_data)
body_parts.append(b"\r\n")
if comment:
body_parts.append(
f'--{boundary}\r\nContent-Disposition: form-data; name="comment"\r\n\r\n{comment}\r\n'
)
if properties:
# Ensure properties is valid JSON - handle special float values
props_json = json.dumps(properties, allow_nan=False, default=str)
body_parts.append(
f'--{boundary}\r\nContent-Disposition: form-data; name="properties"\r\n\r\n{props_json}\r\n'
)
body_parts.append(f"--{boundary}--\r\n")
body = b""
for part in body_parts:
body += part.encode("utf-8") if isinstance(part, str) else part
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Content-Length": str(len(body)),
}
headers.update(_get_auth_headers())
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, context=_get_ssl_context()) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def get_item(self, part_number: str) -> Dict[str, Any]:
return self._request("GET", f"/items/{part_number}")
def list_items(self, search: str = "", item_type: str = "", project: str = "") -> list:
params = ["limit=100"]
if search:
params.append(f"search={urllib.parse.quote(search)}")
if item_type:
params.append(f"type={item_type}")
if project:
params.append(f"project={project}")
return self._request("GET", "/items?" + "&".join(params))
def create_item(
self,
schema: str,
category: str,
description: str = "",
projects: List[str] = None,
) -> Dict[str, Any]:
"""Create a new item with optional project tags."""
data = {
"schema": schema,
"category": category,
"description": description,
}
if projects:
data["projects"] = projects
return self._request("POST", "/items", data)
def update_item(
self, part_number: str, description: str = None, item_type: str = None
) -> Dict[str, Any]:
data = {}
if description is not None:
data["description"] = description
if item_type is not None:
data["item_type"] = item_type
return self._request("PUT", f"/items/{part_number}", data)
def get_revisions(self, part_number: str) -> list:
return self._request("GET", f"/items/{part_number}/revisions")
def get_schema(self, name: str = "kindred-rd") -> Dict[str, Any]:
return self._request("GET", f"/schemas/{name}")
def get_projects(self) -> list:
"""Get list of all projects."""
return self._request("GET", "/projects")
def get_item_projects(self, part_number: str) -> list:
"""Get projects associated with an item."""
return self._request("GET", f"/items/{part_number}/projects")
def add_item_projects(self, part_number: str, project_codes: List[str]) -> Dict[str, Any]:
"""Add project tags to an item."""
return self._request("POST", f"/items/{part_number}/projects", {"projects": project_codes})
def has_file(self, part_number: str) -> Tuple[bool, Optional[int]]:
"""Check if item has files in MinIO."""
try:
revisions = self.get_revisions(part_number)
for rev in revisions:
if rev.get("file_key"):
return True, rev["revision_number"]
return False, None
except Exception:
return False, None
def latest_file_revision(self, part_number: str) -> Optional[Dict]:
"""Return the most recent revision that has a file attached, or None."""
try:
revisions = self.get_revisions(part_number)
for rev in revisions: # revisions come newest-first from the API
if rev.get("file_key"):
return rev
return None
except Exception:
return None
def compare_revisions(self, part_number: str, from_rev: int, to_rev: int) -> Dict[str, Any]:
"""Compare two revisions and return differences."""
return self._request(
"GET",
f"/items/{part_number}/revisions/compare?from={from_rev}&to={to_rev}",
)
def rollback_revision(
self, part_number: str, revision: int, comment: str = ""
) -> Dict[str, Any]:
"""Create a new revision by rolling back to a previous one."""
data = {}
if comment:
data["comment"] = comment
return self._request("POST", f"/items/{part_number}/revisions/{revision}/rollback", data)
def update_revision(
self, part_number: str, revision: int, status: str = None, labels: list = None
) -> Dict[str, Any]:
"""Update revision status and/or labels."""
data = {}
if status:
data["status"] = status
if labels is not None:
data["labels"] = labels
return self._request("PATCH", f"/items/{part_number}/revisions/{revision}", data)
# BOM / Relationship methods
def get_bom(self, part_number: str) -> list:
"""Get single-level BOM for an item."""
return self._request("GET", f"/items/{part_number}/bom")
def get_bom_expanded(self, part_number: str, depth: int = 10) -> list:
"""Get multi-level BOM for an item."""
return self._request("GET", f"/items/{part_number}/bom/expanded?depth={depth}")
def get_bom_where_used(self, part_number: str) -> list:
"""Get assemblies that use this item."""
return self._request("GET", f"/items/{part_number}/bom/where-used")
def add_bom_entry(
self,
parent_pn: str,
child_pn: str,
quantity: float = None,
unit: str = None,
rel_type: str = "component",
ref_des: list = None,
) -> Dict[str, Any]:
"""Add a child item to a parent's BOM."""
data: Dict[str, Any] = {
"child_part_number": child_pn,
"rel_type": rel_type,
}
if quantity is not None:
data["quantity"] = quantity
if unit:
data["unit"] = unit
if ref_des:
data["reference_designators"] = ref_des
return self._request("POST", f"/items/{parent_pn}/bom", data)
def update_bom_entry(
self,
parent_pn: str,
child_pn: str,
quantity: float = None,
unit: str = None,
rel_type: str = None,
ref_des: list = None,
) -> Dict[str, Any]:
"""Update a BOM entry."""
data: Dict[str, Any] = {}
if quantity is not None:
data["quantity"] = quantity
if unit is not None:
data["unit"] = unit
if rel_type is not None:
data["rel_type"] = rel_type
if ref_des is not None:
data["reference_designators"] = ref_des
return self._request("PUT", f"/items/{parent_pn}/bom/{child_pn}", data)
def delete_bom_entry(self, parent_pn: str, child_pn: str) -> None:
"""Remove a child from a parent's BOM."""
url = f"{self.base_url}/items/{parent_pn}/bom/{child_pn}"
headers = {"Content-Type": "application/json"}
headers.update(_get_auth_headers())
req = urllib.request.Request(
url,
headers=headers,
method="DELETE",
)
try:
urllib.request.urlopen(req, context=_get_ssl_context())
except urllib.error.HTTPError as e:
error_body = e.read().decode()
raise RuntimeError(f"API error {e.code}: {error_body}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
# -- Authentication methods ---------------------------------------------
def login(self, username: str, password: str) -> Dict[str, Any]:
"""Authenticate with credentials and obtain an API token.
Performs a session-based login (POST /login), then uses the
session to create a persistent API token via POST /api/auth/tokens.
The API token is stored in preferences for future requests.
"""
import http.cookiejar
# Build a cookie-aware opener for the session flow
base = self.base_url
# Strip /api suffix to get the server root for /login
origin = base.rsplit("/api", 1)[0] if base.endswith("/api") else base
ctx = _get_ssl_context()
cookie_jar = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(cookie_jar),
urllib.request.HTTPSHandler(context=ctx),
)
# Step 1: POST form-encoded credentials to /login
login_url = f"{origin}/login"
form_data = urllib.parse.urlencode({"username": username, "password": password}).encode()
req = urllib.request.Request(
login_url,
data=form_data,
method="POST",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
try:
opener.open(req)
except urllib.error.HTTPError as e:
if e.code in (302, 303):
pass # Redirect after login is expected
else:
raise RuntimeError(
f"Login failed (HTTP {e.code}): invalid credentials or server error"
)
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
# Step 2: Verify session by calling /api/auth/me
me_url = f"{origin}/api/auth/me"
me_req = urllib.request.Request(me_url, method="GET")
try:
with opener.open(me_req) as resp:
user_info = json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
if e.code == 401:
raise RuntimeError("Login failed: invalid username or password")
raise RuntimeError(f"Login verification failed (HTTP {e.code})")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
# Step 3: Create a persistent API token for FreeCAD
token_url = f"{origin}/api/auth/tokens"
import socket
hostname = socket.gethostname()
token_body = json.dumps({"name": f"FreeCAD ({hostname})", "expires_in_days": 90}).encode()
token_req = urllib.request.Request(
token_url,
data=token_body,
method="POST",
headers={"Content-Type": "application/json"},
)
try:
with opener.open(token_req) as resp:
token_result = json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"Failed to create API token (HTTP {e.code})")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
raw_token = token_result.get("token", "")
if not raw_token:
raise RuntimeError("Server did not return an API token")
# Store token and user info
_save_auth_info(
username=user_info.get("username", username),
role=user_info.get("role", ""),
source=user_info.get("auth_source", ""),
token=raw_token,
)
return {
"username": user_info.get("username", username),
"role": user_info.get("role", ""),
"auth_source": user_info.get("auth_source", ""),
"token_name": token_result.get("name", ""),
"token_prefix": token_result.get("token_prefix", ""),
}
def logout(self):
"""Clear stored API token and authentication info."""
_clear_auth()
def is_authenticated(self) -> bool:
"""Return True if a valid API token is configured."""
return bool(_get_auth_token())
def auth_username(self) -> str:
"""Return the stored authenticated username."""
return _get_auth_username()
def auth_role(self) -> str:
"""Return the stored user role."""
return _get_auth_role()
def auth_source(self) -> str:
"""Return the stored authentication source (local, ldap, oidc)."""
return _get_auth_source()
def get_current_user(self) -> Optional[Dict[str, Any]]:
"""Fetch the current user info from the server.
Returns user dict or None if not authenticated.
"""
try:
return self._request("GET", "/auth/me")
except RuntimeError:
return None
def refresh_auth_info(self) -> bool:
"""Refresh locally cached user info from the server.
Returns True if authenticated, False otherwise.
"""
user = self.get_current_user()
if user and user.get("username"):
_save_auth_info(
username=user["username"],
role=user.get("role", ""),
source=user.get("auth_source", ""),
)
return True
return False
def list_tokens(self) -> List[Dict[str, Any]]:
"""List API tokens for the current user."""
return self._request("GET", "/auth/tokens")
def create_token(self, name: str, expires_in_days: Optional[int] = None) -> Dict[str, Any]:
"""Create a new API token.
Returns dict with 'token' (raw, shown once), 'id', 'name', etc.
"""
data: Dict[str, Any] = {"name": name}
if expires_in_days is not None:
data["expires_in_days"] = expires_in_days
return self._request("POST", "/auth/tokens", data)
def revoke_token(self, token_id: str) -> None:
"""Revoke an API token by its ID."""
url = f"{self.base_url}/auth/tokens/{token_id}"
headers = {"Content-Type": "application/json"}
headers.update(_get_auth_headers())
req = urllib.request.Request(url, headers=headers, method="DELETE")
try:
urllib.request.urlopen(req, context=_get_ssl_context())
except urllib.error.HTTPError as e:
error_body = e.read().decode()
raise RuntimeError(f"API error {e.code}: {error_body}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def check_connection(self) -> Tuple[bool, str]:
"""Check connectivity to the Silo API.
Returns (reachable, message).
"""
# Use origin /health (not /api/health) since health is at root
base = self.base_url
origin = base.rsplit("/api", 1)[0] if base.endswith("/api") else base
url = f"{origin}/health"
req = urllib.request.Request(url, method="GET")
try:
with urllib.request.urlopen(req, context=_get_ssl_context(), timeout=5) as resp:
return True, f"OK ({resp.status})"
except urllib.error.HTTPError as e:
return True, f"Server error ({e.code})"
except urllib.error.URLError as e:
return False, str(e.reason)
except Exception as e:
return False, str(e)
_client = SiloClient()
# Utility functions
def sanitize_filename(name: str) -> str:
"""Sanitize a string for use in filenames."""
sanitized = re.sub(r'[<>:"/\\|?*]', "_", name)
sanitized = re.sub(r"[\s_]+", "_", sanitized)
sanitized = sanitized.strip("_ ")
return sanitized[:50]
def parse_part_number(part_number: str) -> Tuple[str, str]:
"""Parse part number into (category, sequence).
New format: CCC-NNNN (e.g., F01-0001)
Returns: (category_code, sequence)
"""
parts = part_number.split("-")
if len(parts) >= 2:
return parts[0], parts[1]
return part_number, ""
def get_category_folder_name(category_code: str) -> str:
"""Get the folder name for a category (e.g., 'F01_screws_bolts')."""
name = CATEGORY_NAMES.get(category_code.upper(), "misc")
return f"{category_code}_{name}"
def get_cad_file_path(part_number: str, description: str = "") -> Path:
"""Generate canonical file path for a CAD file.
Path format: ~/projects/cad/{category_code}_{category_name}/{part_number}_{description}.FCStd
Example: ~/projects/cad/F01_screws_bolts/F01-0001_M3_Socket_Screw.FCStd
"""
category, _ = parse_part_number(part_number)
folder_name = get_category_folder_name(category)
if description:
filename = f"{part_number}_{sanitize_filename(description)}.FCStd"
else:
filename = f"{part_number}.FCStd"
return get_projects_dir() / "cad" / folder_name / filename
def find_file_by_part_number(part_number: str) -> Optional[Path]:
"""Find existing CAD file for a part number."""
category, _ = parse_part_number(part_number)
folder_name = get_category_folder_name(category)
cad_dir = get_projects_dir() / "cad" / folder_name
if cad_dir.exists():
matches = list(cad_dir.glob(f"{part_number}*.FCStd"))
if matches:
return matches[0]
# Also search in base cad directory (for older files or different structures)
base_cad_dir = get_projects_dir() / "cad"
if base_cad_dir.exists():
# Search all subdirectories
for subdir in base_cad_dir.iterdir():
if subdir.is_dir():
matches = list(subdir.glob(f"{part_number}*.FCStd"))
if matches:
return matches[0]
return None
def search_local_files(search_term: str = "", category_filter: str = "") -> list:
"""Search for CAD files in local cad directory."""
results = []
cad_dir = get_projects_dir() / "cad"
if not cad_dir.exists():
return results
search_lower = search_term.lower()
for category_dir in cad_dir.iterdir():
if not category_dir.is_dir():
continue
# Extract category code from folder name (e.g., "F01_screws_bolts" -> "F01")
folder_name = category_dir.name
category_code = folder_name.split("_")[0] if "_" in folder_name else folder_name
if category_filter and category_code.upper() != category_filter.upper():
continue
for fcstd_file in category_dir.glob("*.FCStd"):
filename = fcstd_file.stem
parts = filename.split("_", 1)
part_number = parts[0]
description = parts[1].replace("_", " ") if len(parts) > 1 else ""
if search_term:
searchable = f"{part_number} {description}".lower()
if search_lower not in searchable:
continue
try:
from datetime import datetime
mtime = fcstd_file.stat().st_mtime
modified = datetime.fromtimestamp(mtime).isoformat()
except Exception:
modified = None
results.append(
{
"path": str(fcstd_file),
"part_number": part_number,
"description": description,
"category": category_code,
"modified": modified,
"source": "local",
}
)
results.sort(key=lambda x: x.get("modified") or "", reverse=True)
return results
def _safe_float(val):
"""Convert float to JSON-safe value, handling NaN and Infinity."""
import math
if isinstance(val, float):
if math.isnan(val) or math.isinf(val):
return 0.0
return val
def collect_document_properties(doc) -> Dict[str, Any]:
"""Collect properties from all objects in a document."""
result = {
"_document_name": doc.Name,
"_file_name": doc.FileName or None,
"objects": {},
}
for obj in doc.Objects:
if obj.TypeId.startswith("App::") and obj.TypeId not in ("App::Part",):
continue
props = {"_object_type": obj.TypeId, "_label": obj.Label}
if hasattr(obj, "Placement"):
p = obj.Placement
props["placement"] = {
"position": {
"x": _safe_float(p.Base.x),
"y": _safe_float(p.Base.y),
"z": _safe_float(p.Base.z),
},
"rotation": {
"axis": {
"x": _safe_float(p.Rotation.Axis.x),
"y": _safe_float(p.Rotation.Axis.y),
"z": _safe_float(p.Rotation.Axis.z),
},
"angle": _safe_float(p.Rotation.Angle),
},
}
if hasattr(obj, "Shape") and obj.Shape:
try:
bbox = obj.Shape.BoundBox
props["bounding_box"] = {
"x_length": _safe_float(bbox.XLength),
"y_length": _safe_float(bbox.YLength),
"z_length": _safe_float(bbox.ZLength),
}
if hasattr(obj.Shape, "Volume"):
props["volume"] = _safe_float(obj.Shape.Volume)
except Exception:
pass
result["objects"][obj.Label] = props
return result
def set_silo_properties(obj, props: Dict[str, Any]):
"""Set Silo properties on FreeCAD object."""
for name, value in props.items():
if not hasattr(obj, name):
if isinstance(value, str):
obj.addProperty("App::PropertyString", name, "Silo", "")
elif isinstance(value, int):
obj.addProperty("App::PropertyInteger", name, "Silo", "")
setattr(obj, name, value)
def get_tracked_object(doc):
"""Find the primary tracked object in a document."""
for obj in doc.Objects:
if hasattr(obj, "SiloPartNumber") and obj.SiloPartNumber:
return obj
return None
class SiloSync:
"""Handles synchronization between FreeCAD and Silo."""
def __init__(self, client: SiloClient = None):
self.client = client or _client
def save_to_canonical_path(self, doc, force_rename: bool = False) -> Optional[Path]:
"""Save document to canonical path."""
obj = get_tracked_object(doc)
if not obj:
return None
part_number = obj.SiloPartNumber
try:
item = self.client.get_item(part_number)
description = item.get("description", "")
new_path = get_cad_file_path(part_number, description)
new_path.parent.mkdir(parents=True, exist_ok=True)
existing_path = find_file_by_part_number(part_number)
current_path = Path(doc.FileName) if doc.FileName else None
# Use save() if already at the correct path, saveAs() only if path changes
if current_path and current_path == new_path:
doc.save()
elif (
existing_path
and existing_path != new_path
and (force_rename or current_path == existing_path)
):
doc.saveAs(str(new_path))
try:
existing_path.unlink()
except OSError:
pass
else:
doc.saveAs(str(new_path))
return new_path
except Exception as e:
FreeCAD.Console.PrintError(f"Save failed: {e}\n")
return None
def create_document_for_item(self, item: Dict[str, Any], save: bool = True):
"""Create a new FreeCAD document for a database item."""
part_number = item.get("part_number", "")
description = item.get("description", "")
item_type = item.get("item_type", "part")
if not part_number:
return None
doc = FreeCAD.newDocument(part_number)
safe_name = "_" + part_number
if item_type == "assembly":
# Create an Assembly object for assembly items (FreeCAD 1.0+)
try:
assembly_obj = doc.addObject("Assembly::AssemblyObject", safe_name)
assembly_obj.Label = part_number
set_silo_properties(
assembly_obj,
{
"SiloItemId": item.get("id", ""),
"SiloPartNumber": part_number,
"SiloRevision": item.get("current_revision", 1),
"SiloItemType": item_type,
},
)
except Exception as e:
# Fallback to App::Part if Assembly workbench not available
FreeCAD.Console.PrintWarning(
f"Assembly workbench not available, using App::Part: {e}\n"
)
part_obj = doc.addObject("App::Part", safe_name)
part_obj.Label = part_number
set_silo_properties(
part_obj,
{
"SiloItemId": item.get("id", ""),
"SiloPartNumber": part_number,
"SiloRevision": item.get("current_revision", 1),
"SiloItemType": item_type,
},
)
else:
# Create a Part container for non-assembly items
part_obj = doc.addObject("App::Part", safe_name)
part_obj.Label = part_number
set_silo_properties(
part_obj,
{
"SiloItemId": item.get("id", ""),
"SiloPartNumber": part_number,
"SiloRevision": item.get("current_revision", 1),
"SiloItemType": item_type,
},
)
# Add a Body for parts (not assemblies)
body_label = sanitize_filename(description) if description else "Body"
body = doc.addObject("PartDesign::Body", "_" + body_label)
body.Label = body_label
part_obj.addObject(body)
doc.recompute()
if save:
file_path = get_cad_file_path(part_number, description)
file_path.parent.mkdir(parents=True, exist_ok=True)
doc.saveAs(str(file_path))
return doc
def open_item(self, part_number: str):
"""Open or create item document."""
existing_path = find_file_by_part_number(part_number)
if existing_path and existing_path.exists():
return FreeCAD.openDocument(str(existing_path))
try:
item = self.client.get_item(part_number)
return self.create_document_for_item(item, save=True)
except Exception as e:
FreeCAD.Console.PrintError(f"Failed to open: {e}\n")
return None
def upload_file(
self, part_number: str, file_path: str, comment: str = "Auto-save"
) -> Optional[Dict]:
"""Upload file to MinIO."""
try:
doc = FreeCAD.openDocument(file_path)
if not doc:
return None
properties = collect_document_properties(doc)
FreeCAD.closeDocument(doc.Name)
return self.client._upload_file(part_number, file_path, properties, comment)
except Exception as e:
FreeCAD.Console.PrintError(f"Upload failed: {e}\n")
return None
def download_file(self, part_number: str) -> Optional[Path]:
"""Download latest file from MinIO."""
try:
item = self.client.get_item(part_number)
file_path = get_cad_file_path(part_number, item.get("description", ""))
file_path.parent.mkdir(parents=True, exist_ok=True)
revisions = self.client.get_revisions(part_number)
for rev in revisions:
if rev.get("file_key"):
if self.client._download_file(
part_number, rev["revision_number"], str(file_path)
):
return file_path
return None
except Exception as e:
FreeCAD.Console.PrintError(f"Download failed: {e}\n")
return None
_sync = SiloSync()
# ============================================================================
# COMMANDS
# ============================================================================
class Silo_Open:
"""Open item - combined search and open dialog."""
def GetResources(self):
return {
"MenuText": "Open",
"ToolTip": "Search and open items (Ctrl+O)",
"Pixmap": _icon("open"),
}
def Activated(self):
from PySide import QtCore, QtGui
dialog = QtGui.QDialog()
dialog.setWindowTitle("Silo - Open Item")
dialog.setMinimumWidth(700)
dialog.setMinimumHeight(500)
layout = QtGui.QVBoxLayout(dialog)
# Search row
search_layout = QtGui.QHBoxLayout()
search_input = QtGui.QLineEdit()
search_input.setPlaceholderText("Search by part number or description...")
search_layout.addWidget(search_input)
layout.addLayout(search_layout)
# Filters
filter_layout = QtGui.QHBoxLayout()
db_checkbox = QtGui.QCheckBox("Database")
db_checkbox.setChecked(True)
local_checkbox = QtGui.QCheckBox("Local Files")
local_checkbox.setChecked(True)
filter_layout.addWidget(db_checkbox)
filter_layout.addWidget(local_checkbox)
filter_layout.addStretch()
layout.addLayout(filter_layout)
# Results table
results_table = QtGui.QTableWidget()
results_table.setColumnCount(5)
results_table.setHorizontalHeaderLabels(
["Part Number", "Description", "Type", "Source", "Modified"]
)
results_table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows)
results_table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection)
results_table.horizontalHeader().setStretchLastSection(True)
results_table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers)
layout.addWidget(results_table)
results_data = []
def do_search():
nonlocal results_data
search_term = search_input.text().strip()
results_data = []
results_table.setRowCount(0)
if db_checkbox.isChecked():
try:
for item in _client.list_items(search=search_term):
results_data.append(
{
"part_number": item.get("part_number", ""),
"description": item.get("description", ""),
"item_type": item.get("item_type", ""),
"source": "database",
"modified": item.get("updated_at", "")[:10]
if item.get("updated_at")
else "",
"path": None,
}
)
except Exception as e:
FreeCAD.Console.PrintWarning(f"DB search failed: {e}\n")
if local_checkbox.isChecked():
try:
for item in search_local_files(search_term):
existing = next(
(r for r in results_data if r["part_number"] == item["part_number"]),
None,
)
if existing:
existing["source"] = "both"
existing["path"] = item.get("path")
else:
results_data.append(
{
"part_number": item.get("part_number", ""),
"description": item.get("description", ""),
"item_type": "",
"source": "local",
"modified": item.get("modified", "")[:10]
if item.get("modified")
else "",
"path": item.get("path"),
}
)
except Exception as e:
FreeCAD.Console.PrintWarning(f"Local search failed: {e}\n")
results_table.setRowCount(len(results_data))
for row, data in enumerate(results_data):
results_table.setItem(row, 0, QtGui.QTableWidgetItem(data["part_number"]))
results_table.setItem(row, 1, QtGui.QTableWidgetItem(data["description"]))
results_table.setItem(row, 2, QtGui.QTableWidgetItem(data["item_type"]))
results_table.setItem(row, 3, QtGui.QTableWidgetItem(data["source"]))
results_table.setItem(row, 4, QtGui.QTableWidgetItem(data["modified"]))
results_table.resizeColumnsToContents()
_open_after_close = [None]
def open_selected():
selected = results_table.selectedItems()
if not selected:
return
row = selected[0].row()
_open_after_close[0] = dict(results_data[row])
dialog.accept()
search_input.textChanged.connect(lambda: do_search())
results_table.doubleClicked.connect(open_selected)
# Buttons
btn_layout = QtGui.QHBoxLayout()
open_btn = QtGui.QPushButton("Open")
open_btn.clicked.connect(open_selected)
cancel_btn = QtGui.QPushButton("Cancel")
cancel_btn.clicked.connect(dialog.reject)
btn_layout.addStretch()
btn_layout.addWidget(open_btn)
btn_layout.addWidget(cancel_btn)
layout.addLayout(btn_layout)
do_search()
dialog.exec_()
# Open the document AFTER the dialog has fully closed so that
# heavy document loads (especially Assembly files) don't run
# inside the dialog's nested event loop, which can cause crashes.
data = _open_after_close[0]
if data is not None:
if data.get("path"):
FreeCAD.openDocument(data["path"])
else:
_sync.open_item(data["part_number"])
def IsActive(self):
return True
class Silo_New:
"""Create new item with part number."""
def GetResources(self):
return {
"MenuText": "New",
"ToolTip": "Create new item (Ctrl+N)",
"Pixmap": _icon("new"),
}
def Activated(self):
from PySide import QtGui
sel = FreeCADGui.Selection.getSelection()
# Category selection
try:
schema = _client.get_schema()
categories = schema.get("segments", [])
cat_segment = next((s for s in categories if s.get("name") == "category"), None)
if cat_segment and cat_segment.get("values"):
cat_list = [f"{k} - {v}" for k, v in sorted(cat_segment["values"].items())]
category_str, ok = QtGui.QInputDialog.getItem(
None, "New Item", "Category:", cat_list, 0, False
)
if not ok:
return
category = category_str.split(" - ")[0]
else:
category, ok = QtGui.QInputDialog.getText(None, "New Item", "Category code:")
if not ok:
return
except Exception:
category, ok = QtGui.QInputDialog.getText(None, "New Item", "Category code:")
if not ok:
return
# Description
default_desc = sel[0].Label if sel else ""
description, ok = QtGui.QInputDialog.getText(
None, "New Item", "Description:", text=default_desc
)
if not ok:
return
# Optional project tagging
selected_projects = []
try:
projects = _client.get_projects()
if projects:
project_codes = [p.get("code", "") for p in projects if p.get("code")]
if project_codes:
# Multi-select dialog for projects
dialog = QtGui.QDialog()
dialog.setWindowTitle("Tag with Projects (Optional)")
dialog.setMinimumWidth(300)
layout = QtGui.QVBoxLayout(dialog)
label = QtGui.QLabel("Select projects to tag this item with:")
layout.addWidget(label)
list_widget = QtGui.QListWidget()
list_widget.setSelectionMode(QtGui.QAbstractItemView.MultiSelection)
for code in project_codes:
list_widget.addItem(code)
layout.addWidget(list_widget)
btn_layout = QtGui.QHBoxLayout()
skip_btn = QtGui.QPushButton("Skip")
ok_btn = QtGui.QPushButton("Tag Selected")
btn_layout.addWidget(skip_btn)
btn_layout.addWidget(ok_btn)
layout.addLayout(btn_layout)
skip_btn.clicked.connect(dialog.reject)
ok_btn.clicked.connect(dialog.accept)
if dialog.exec_() == QtGui.QDialog.Accepted:
selected_projects = [item.text() for item in list_widget.selectedItems()]
except Exception as e:
FreeCAD.Console.PrintWarning(f"Could not fetch projects: {e}\n")
try:
result = _client.create_item(
"kindred-rd",
category,
description,
projects=selected_projects if selected_projects else None,
)
part_number = result["part_number"]
if sel:
# Tag selected object
obj = sel[0]
set_silo_properties(
obj,
{
"SiloItemId": result.get("id", ""),
"SiloPartNumber": part_number,
"SiloRevision": 1,
},
)
obj.Label = part_number
_sync.save_to_canonical_path(FreeCAD.ActiveDocument, force_rename=True)
else:
# Create new document
_sync.create_document_for_item(result, save=True)
msg = f"Part number: {part_number}"
if selected_projects:
msg += f"\nTagged with projects: {', '.join(selected_projects)}"
FreeCAD.Console.PrintMessage(f"Created: {part_number}\n")
QtGui.QMessageBox.information(None, "Item Created", msg)
except Exception as e:
QtGui.QMessageBox.critical(None, "Error", str(e))
def IsActive(self):
return True
class Silo_Save:
"""Save locally and upload to MinIO."""
def GetResources(self):
return {
"MenuText": "Save",
"ToolTip": "Save locally and upload to MinIO (Ctrl+S)",
"Pixmap": _icon("save"),
}
def Activated(self):
doc = FreeCAD.ActiveDocument
if not doc:
FreeCAD.Console.PrintError("No active document\n")
return
obj = get_tracked_object(doc)
# If not tracked, just do a regular FreeCAD save
if not obj:
if doc.FileName:
doc.save()
FreeCAD.Console.PrintMessage(f"Saved: {doc.FileName}\n")
else:
FreeCADGui.runCommand("Std_SaveAs", 0)
return
part_number = obj.SiloPartNumber
# Check if document has unsaved changes
gui_doc = FreeCADGui.getDocument(doc.Name)
is_modified = gui_doc.Modified if gui_doc else True
FreeCAD.Console.PrintMessage(f"[DEBUG] Modified={is_modified}, FileName={doc.FileName}\n")
if gui_doc and not is_modified and doc.FileName:
FreeCAD.Console.PrintMessage("No changes to save.\n")
return
# Collect properties BEFORE saving to avoid dirtying the document
# (accessing Shape properties can trigger recompute)
FreeCAD.Console.PrintMessage("[DEBUG] Collecting properties...\n")
properties = collect_document_properties(doc)
# Check modified state after collecting properties
is_modified_after_props = gui_doc.Modified if gui_doc else True
FreeCAD.Console.PrintMessage(
f"[DEBUG] After collect_properties: Modified={is_modified_after_props}\n"
)
# Save locally
FreeCAD.Console.PrintMessage("[DEBUG] Saving to canonical path...\n")
file_path = _sync.save_to_canonical_path(doc, force_rename=True)
if not file_path:
# Fallback to regular save if canonical path fails
if doc.FileName:
doc.save()
file_path = Path(doc.FileName)
else:
FreeCAD.Console.PrintError("Could not determine save path\n")
return
# Check modified state after save
is_modified_after_save = gui_doc.Modified if gui_doc else True
FreeCAD.Console.PrintMessage(f"[DEBUG] After save: Modified={is_modified_after_save}\n")
# Force clear modified flag if save succeeded (needed for assemblies)
if is_modified_after_save and gui_doc:
FreeCAD.Console.PrintMessage("[DEBUG] Attempting to clear Modified flag...\n")
try:
gui_doc.Modified = False
FreeCAD.Console.PrintMessage(
f"[DEBUG] After force clear: Modified={gui_doc.Modified}\n"
)
except Exception as e:
FreeCAD.Console.PrintMessage(f"[DEBUG] Could not clear Modified: {e}\n")
FreeCAD.Console.PrintMessage(f"Saved: {file_path}\n")
# Try to upload to MinIO
try:
result = _client._upload_file(part_number, str(file_path), properties, "Auto-save")
new_rev = result["revision_number"]
FreeCAD.Console.PrintMessage(f"Uploaded as revision {new_rev}\n")
# Check modified state after upload
is_modified_after_upload = gui_doc.Modified if gui_doc else True
FreeCAD.Console.PrintMessage(
f"[DEBUG] After upload: Modified={is_modified_after_upload}\n"
)
except Exception as e:
FreeCAD.Console.PrintWarning(f"Upload failed: {e}\n")
FreeCAD.Console.PrintMessage("File saved locally but not uploaded.\n")
def IsActive(self):
return FreeCAD.ActiveDocument is not None
class Silo_Commit:
"""Save as new revision with comment."""
def GetResources(self):
return {
"MenuText": "Commit",
"ToolTip": "Save as new revision with comment (Ctrl+Shift+S)",
"Pixmap": _icon("commit"),
}
def Activated(self):
from PySide import QtGui
doc = FreeCAD.ActiveDocument
if not doc:
FreeCAD.Console.PrintError("No active document\n")
return
obj = get_tracked_object(doc)
if not obj:
FreeCAD.Console.PrintError("No tracked object. Use 'New' to register first.\n")
return
part_number = obj.SiloPartNumber
comment, ok = QtGui.QInputDialog.getText(None, "Commit", "Revision comment:")
if not ok:
return
# Collect properties BEFORE saving to avoid dirtying the document
properties = collect_document_properties(doc)
try:
file_path = _sync.save_to_canonical_path(doc, force_rename=True)
if not file_path:
return
result = _client._upload_file(part_number, str(file_path), properties, comment)
new_rev = result["revision_number"]
FreeCAD.Console.PrintMessage(f"Committed revision {new_rev}: {comment}\n")
except Exception as e:
FreeCAD.Console.PrintError(f"Commit failed: {e}\n")
def IsActive(self):
return FreeCAD.ActiveDocument is not None
def _check_pull_conflicts(part_number, local_path, doc=None):
"""Check for conflicts between local file and server state.
Returns a list of conflict description strings, or an empty list if clean.
"""
conflicts = []
# Check for unsaved changes in an open document
if doc is not None and doc.IsModified():
conflicts.append("Document has unsaved local changes.")
# Check local revision vs server latest
if doc is not None:
obj = get_tracked_object(doc)
if obj and hasattr(obj, "SiloRevision"):
local_rev = getattr(obj, "SiloRevision", 0)
latest = _client.latest_file_revision(part_number)
if latest and local_rev and latest["revision_number"] > local_rev:
conflicts.append(
f"Local file is at revision {local_rev}, "
f"server has revision {latest['revision_number']}."
)
# Check local file mtime vs server timestamp
if local_path and local_path.exists():
import datetime
local_mtime = datetime.datetime.fromtimestamp(
local_path.stat().st_mtime, tz=datetime.timezone.utc
)
try:
item = _client.get_item(part_number)
server_updated = item.get("updated_at", "")
if server_updated:
# Parse ISO format timestamp
server_dt = datetime.datetime.fromisoformat(server_updated.replace("Z", "+00:00"))
if server_dt > local_mtime:
conflicts.append("Server version is newer than local file.")
except Exception:
pass
return conflicts
class SiloPullDialog:
"""Dialog for selecting which revision to pull."""
def __init__(self, part_number, revisions, parent=None):
from PySide import QtCore, QtGui
self._selected_revision = None
self._dialog = QtGui.QDialog(parent)
self._dialog.setWindowTitle(f"Pull - {part_number}")
self._dialog.setMinimumWidth(600)
self._dialog.setMinimumHeight(350)
layout = QtGui.QVBoxLayout(self._dialog)
info = QtGui.QLabel(f"Select a revision to download for {part_number}:")
layout.addWidget(info)
# Revision table
self._table = QtGui.QTableWidget()
self._table.setColumnCount(5)
self._table.setHorizontalHeaderLabels(["Rev", "Date", "Comment", "Status", "File"])
self._table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows)
self._table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection)
self._table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers)
self._table.verticalHeader().setVisible(False)
header = self._table.horizontalHeader()
header.setStretchLastSection(True)
# Populate rows
file_revisions = []
self._table.setRowCount(len(revisions))
for i, rev in enumerate(revisions):
rev_num = rev.get("revision_number", "")
date = rev.get("created_at", "")[:16].replace("T", " ")
comment = rev.get("comment", "")
status = rev.get("status", "")
has_file = "\u2713" if rev.get("file_key") else ""
self._table.setItem(i, 0, QtGui.QTableWidgetItem(str(rev_num)))
self._table.setItem(i, 1, QtGui.QTableWidgetItem(date))
self._table.setItem(i, 2, QtGui.QTableWidgetItem(comment))
self._table.setItem(i, 3, QtGui.QTableWidgetItem(status))
file_item = QtGui.QTableWidgetItem(has_file)
file_item.setTextAlignment(QtCore.Qt.AlignCenter)
self._table.setItem(i, 4, file_item)
if rev.get("file_key"):
file_revisions.append(i)
self._table.resizeColumnsToContents()
layout.addWidget(self._table)
# Pre-select the latest revision with a file
if file_revisions:
self._table.selectRow(file_revisions[0])
# Store revision data for lookup
self._revisions = revisions
# Buttons
btn_layout = QtGui.QHBoxLayout()
btn_layout.addStretch()
download_btn = QtGui.QPushButton("Download")
cancel_btn = QtGui.QPushButton("Cancel")
download_btn.clicked.connect(self._on_download)
cancel_btn.clicked.connect(self._dialog.reject)
self._table.doubleClicked.connect(self._on_download)
btn_layout.addWidget(download_btn)
btn_layout.addWidget(cancel_btn)
layout.addLayout(btn_layout)
def _on_download(self):
row = self._table.currentRow()
if row < 0:
return
rev = self._revisions[row]
if not rev.get("file_key"):
from PySide import QtGui
QtGui.QMessageBox.information(
self._dialog, "Pull", "Selected revision has no file attached."
)
return
self._selected_revision = rev
self._dialog.accept()
def exec_(self):
if self._dialog.exec_() == 1: # QDialog.Accepted
return self._selected_revision
return None
class Silo_Pull:
"""Download from MinIO / sync from database."""
def GetResources(self):
return {
"MenuText": "Pull",
"ToolTip": "Download from MinIO with revision selection",
"Pixmap": _icon("pull"),
}
def Activated(self):
from PySide import QtGui
doc = FreeCAD.ActiveDocument
part_number = None
obj = None
if doc:
obj = get_tracked_object(doc)
if obj:
part_number = obj.SiloPartNumber
if not part_number:
part_number, ok = QtGui.QInputDialog.getText(None, "Pull", "Part number:")
if not ok or not part_number:
return
part_number = part_number.strip().upper()
# Fetch revisions from server
try:
revisions = _client.get_revisions(part_number)
except Exception as e:
QtGui.QMessageBox.warning(None, "Pull", f"Cannot reach server: {e}")
return
existing_local = find_file_by_part_number(part_number)
# If no revisions have files, fall back to create-from-database
has_any_file = any(r.get("file_key") for r in revisions)
if not has_any_file:
if existing_local:
FreeCAD.Console.PrintMessage(f"Opening existing local file: {existing_local}\n")
FreeCAD.openDocument(str(existing_local))
else:
try:
item = _client.get_item(part_number)
new_doc = _sync.create_document_for_item(item, save=True)
if new_doc:
FreeCAD.Console.PrintMessage(f"Created local file for {part_number}\n")
else:
QtGui.QMessageBox.warning(
None,
"Pull",
f"Failed to create document for {part_number}",
)
except Exception as e:
QtGui.QMessageBox.warning(None, "Pull", f"Failed: {e}")
return
# Conflict detection
conflicts = _check_pull_conflicts(part_number, existing_local, doc)
if conflicts:
detail = "\n".join(f" - {c}" for c in conflicts)
reply = QtGui.QMessageBox.warning(
None,
"Pull - Conflicts Detected",
f"Potential conflicts found:\n{detail}\n\n"
"Download anyway and overwrite local file?",
QtGui.QMessageBox.Yes | QtGui.QMessageBox.No,
)
if reply != QtGui.QMessageBox.Yes:
return
# Show revision selection dialog
dlg = SiloPullDialog(part_number, revisions)
selected = dlg.exec_()
if selected is None:
return
rev_num = selected["revision_number"]
# Determine destination path
try:
item = _client.get_item(part_number)
except Exception:
item = {}
dest_path = get_cad_file_path(part_number, item.get("description", ""))
dest_path.parent.mkdir(parents=True, exist_ok=True)
# Download with progress
progress = QtGui.QProgressDialog(
f"Downloading {part_number} rev {rev_num}...", "Cancel", 0, 100
)
progress.setWindowModality(2) # Qt.WindowModal
progress.setMinimumDuration(0)
progress.setValue(0)
def on_progress(downloaded, total):
if progress.wasCanceled():
return
if total > 0:
pct = int(downloaded * 100 / total)
progress.setValue(min(pct, 99))
else:
# Indeterminate - pulse between 0-90
progress.setValue(min(int(downloaded / 1024) % 90, 89))
QtGui.QApplication.processEvents()
try:
ok = _client._download_file(
part_number, rev_num, str(dest_path), progress_callback=on_progress
)
except Exception as e:
progress.close()
QtGui.QMessageBox.warning(None, "Pull", f"Download failed: {e}")
return
progress.setValue(100)
progress.close()
if not ok:
QtGui.QMessageBox.warning(None, "Pull", "Download failed.")
return
FreeCAD.Console.PrintMessage(f"Pulled revision {rev_num} of {part_number}\n")
# Close existing document if open, then reopen
if doc and doc.FileName == str(dest_path):
FreeCAD.closeDocument(doc.Name)
FreeCAD.openDocument(str(dest_path))
# Update SiloRevision property on the tracked object
new_doc = FreeCAD.ActiveDocument
if new_doc:
new_obj = get_tracked_object(new_doc)
if new_obj and hasattr(new_obj, "SiloRevision"):
new_obj.SiloRevision = rev_num
new_doc.save()
def IsActive(self):
return True
class Silo_Push:
"""Upload local files to MinIO."""
def GetResources(self):
return {
"MenuText": "Push",
"ToolTip": "Upload local files that aren't in MinIO",
"Pixmap": _icon("push"),
}
def Activated(self):
from datetime import datetime, timezone
from PySide import QtGui
# Find files that need uploading (no server file, or local is newer)
local_files = search_local_files()
unuploaded = []
for lf in local_files:
pn = lf["part_number"]
try:
_client.get_item(pn) # Check if in DB
server_rev = _client.latest_file_revision(pn)
if not server_rev:
# No file on server at all
unuploaded.append(lf)
else:
# Compare local mtime against server revision timestamp
try:
local_mtime = os.path.getmtime(lf["path"])
server_time_str = server_rev.get("created_at", "")
if server_time_str:
server_dt = datetime.fromisoformat(
server_time_str.replace("Z", "+00:00")
)
local_dt = datetime.fromtimestamp(local_mtime, tz=timezone.utc)
if local_dt > server_dt:
unuploaded.append(lf)
else:
# Can't parse server time, assume needs upload
unuploaded.append(lf)
except Exception:
# On any comparison error, include it
unuploaded.append(lf)
except Exception:
pass # Not in DB, skip
if not unuploaded:
QtGui.QMessageBox.information(None, "Push", "All local files are already uploaded.")
return
msg = f"Found {len(unuploaded)} files to upload:\n\n"
for item in unuploaded[:10]:
msg += f" {item['part_number']}\n"
if len(unuploaded) > 10:
msg += f" ... and {len(unuploaded) - 10} more\n"
msg += "\nUpload?"
reply = QtGui.QMessageBox.question(
None, "Push", msg, QtGui.QMessageBox.Yes | QtGui.QMessageBox.No
)
if reply != QtGui.QMessageBox.Yes:
return
uploaded = 0
for item in unuploaded:
result = _sync.upload_file(item["part_number"], item["path"], "Synced from local")
if result:
uploaded += 1
QtGui.QMessageBox.information(None, "Push", f"Uploaded {uploaded} files.")
def IsActive(self):
return True
class Silo_Info:
"""Show item status and revision history."""
def GetResources(self):
return {
"MenuText": "Info",
"ToolTip": "Show item status and revision history",
"Pixmap": _icon("info"),
}
def Activated(self):
from PySide import QtGui
doc = FreeCAD.ActiveDocument
if not doc:
FreeCAD.Console.PrintError("No active document\n")
return
obj = get_tracked_object(doc)
if not obj:
FreeCAD.Console.PrintError("No tracked object\n")
return
part_number = obj.SiloPartNumber
try:
item = _client.get_item(part_number)
revisions = _client.get_revisions(part_number)
# Get projects for item
try:
projects = _client.get_item_projects(part_number)
project_codes = [p.get("code", "") for p in projects if p.get("code")]
except Exception:
project_codes = []
msg = f"<h3>{part_number}</h3>"
msg += f"<p><b>Type:</b> {item.get('item_type', '-')}</p>"
msg += f"<p><b>Description:</b> {item.get('description', '-')}</p>"
msg += (
f"<p><b>Projects:</b> {', '.join(project_codes) if project_codes else 'None'}</p>"
)
msg += f"<p><b>Current Revision:</b> {item.get('current_revision', 1)}</p>"
msg += f"<p><b>Local Revision:</b> {getattr(obj, 'SiloRevision', '-')}</p>"
has_file, _ = _client.has_file(part_number)
msg += f"<p><b>File in MinIO:</b> {'Yes' if has_file else 'No'}</p>"
# Show current revision status
if revisions:
current_status = revisions[0].get("status", "draft")
current_labels = revisions[0].get("labels", [])
msg += f"<p><b>Current Status:</b> {current_status}</p>"
if current_labels:
msg += f"<p><b>Labels:</b> {', '.join(current_labels)}</p>"
msg += "<h4>Revision History</h4><table border='1' cellpadding='4'>"
msg += "<tr><th>Rev</th><th>Status</th><th>Date</th><th>File</th><th>Comment</th></tr>"
for rev in revisions:
file_icon = "" if rev.get("file_key") else "-"
comment = rev.get("comment", "") or "-"
date = rev.get("created_at", "")[:10]
status = rev.get("status", "draft")
msg += f"<tr><td>{rev['revision_number']}</td><td>{status}</td><td>{date}</td><td>{file_icon}</td><td>{comment}</td></tr>"
msg += "</table>"
dialog = QtGui.QMessageBox()
dialog.setWindowTitle("Item Info")
dialog.setTextFormat(QtGui.Qt.RichText)
dialog.setText(msg)
dialog.exec_()
except Exception as e:
QtGui.QMessageBox.warning(None, "Info", f"Failed to get info: {e}")
def IsActive(self):
return FreeCAD.ActiveDocument is not None
class Silo_TagProjects:
"""Manage project tags for an item."""
def GetResources(self):
return {
"MenuText": "Tag Projects",
"ToolTip": "Add or remove project tags for an item",
"Pixmap": _icon("tag"),
}
def Activated(self):
from PySide import QtGui
doc = FreeCAD.ActiveDocument
if not doc:
FreeCAD.Console.PrintError("No active document\n")
return
obj = get_tracked_object(doc)
if not obj:
FreeCAD.Console.PrintError("No tracked object\n")
return
part_number = obj.SiloPartNumber
try:
# Get current projects for item
current_projects = _client.get_item_projects(part_number)
current_codes = {p.get("code", "") for p in current_projects if p.get("code")}
# Get all available projects
all_projects = _client.get_projects()
all_codes = [p.get("code", "") for p in all_projects if p.get("code")]
if not all_codes:
QtGui.QMessageBox.information(
None,
"Tag Projects",
"No projects available. Create projects first.",
)
return
# Multi-select dialog
dialog = QtGui.QDialog()
dialog.setWindowTitle(f"Tag Projects for {part_number}")
dialog.setMinimumWidth(350)
layout = QtGui.QVBoxLayout(dialog)
label = QtGui.QLabel("Select projects to associate with this item:")
layout.addWidget(label)
list_widget = QtGui.QListWidget()
list_widget.setSelectionMode(QtGui.QAbstractItemView.MultiSelection)
for code in all_codes:
item = QtGui.QListWidgetItem(code)
list_widget.addItem(item)
if code in current_codes:
item.setSelected(True)
layout.addWidget(list_widget)
btn_layout = QtGui.QHBoxLayout()
cancel_btn = QtGui.QPushButton("Cancel")
save_btn = QtGui.QPushButton("Save")
btn_layout.addWidget(cancel_btn)
btn_layout.addWidget(save_btn)
layout.addLayout(btn_layout)
cancel_btn.clicked.connect(dialog.reject)
save_btn.clicked.connect(dialog.accept)
if dialog.exec_() == QtGui.QDialog.Accepted:
selected = [item.text() for item in list_widget.selectedItems()]
# Add new tags
to_add = [c for c in selected if c not in current_codes]
if to_add:
_client.add_item_projects(part_number, to_add)
# Note: removing tags would require a separate API call per project
# For simplicity, we just add new ones here
msg = f"Updated project tags for {part_number}"
if to_add:
msg += f"\nAdded: {', '.join(to_add)}"
QtGui.QMessageBox.information(None, "Tag Projects", msg)
FreeCAD.Console.PrintMessage(f"{msg}\n")
except Exception as e:
QtGui.QMessageBox.warning(None, "Tag Projects", f"Failed: {e}")
def IsActive(self):
return FreeCAD.ActiveDocument is not None
class Silo_Rollback:
"""Rollback to a previous revision."""
def GetResources(self):
return {
"MenuText": "Rollback",
"ToolTip": "Rollback to a previous revision",
"Pixmap": _icon("rollback"),
}
def Activated(self):
from PySide import QtCore, QtGui
doc = FreeCAD.ActiveDocument
if not doc:
FreeCAD.Console.PrintError("No active document\n")
return
obj = get_tracked_object(doc)
if not obj:
FreeCAD.Console.PrintError("No tracked object\n")
return
part_number = obj.SiloPartNumber
try:
revisions = _client.get_revisions(part_number)
if len(revisions) < 2:
QtGui.QMessageBox.information(
None, "Rollback", "No previous revisions to rollback to."
)
return
# Build revision list for selection (exclude current/latest)
current_rev = revisions[0]["revision_number"]
prev_revisions = revisions[1:] # All except latest
# Create selection dialog
dialog = QtGui.QDialog()
dialog.setWindowTitle(f"Rollback {part_number}")
dialog.setMinimumWidth(500)
dialog.setMinimumHeight(300)
layout = QtGui.QVBoxLayout(dialog)
label = QtGui.QLabel(f"Select a revision to rollback to (current: Rev {current_rev}):")
layout.addWidget(label)
# Revision table
table = QtGui.QTableWidget()
table.setColumnCount(4)
table.setHorizontalHeaderLabels(["Rev", "Status", "Date", "Comment"])
table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows)
table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection)
table.setRowCount(len(prev_revisions))
table.horizontalHeader().setStretchLastSection(True)
for i, rev in enumerate(prev_revisions):
table.setItem(i, 0, QtGui.QTableWidgetItem(str(rev["revision_number"])))
table.setItem(i, 1, QtGui.QTableWidgetItem(rev.get("status", "draft")))
table.setItem(i, 2, QtGui.QTableWidgetItem(rev.get("created_at", "")[:10]))
table.setItem(i, 3, QtGui.QTableWidgetItem(rev.get("comment", "") or ""))
table.resizeColumnsToContents()
layout.addWidget(table)
# Comment field
comment_label = QtGui.QLabel("Rollback comment (optional):")
layout.addWidget(comment_label)
comment_input = QtGui.QLineEdit()
comment_input.setPlaceholderText("Reason for rollback...")
layout.addWidget(comment_input)
# Buttons
btn_layout = QtGui.QHBoxLayout()
cancel_btn = QtGui.QPushButton("Cancel")
rollback_btn = QtGui.QPushButton("Rollback")
btn_layout.addStretch()
btn_layout.addWidget(cancel_btn)
btn_layout.addWidget(rollback_btn)
layout.addLayout(btn_layout)
selected_rev = [None]
def on_rollback():
selected = table.selectedItems()
if not selected:
QtGui.QMessageBox.warning(dialog, "Rollback", "Please select a revision")
return
selected_rev[0] = int(table.item(selected[0].row(), 0).text())
dialog.accept()
cancel_btn.clicked.connect(dialog.reject)
rollback_btn.clicked.connect(on_rollback)
if dialog.exec_() == QtGui.QDialog.Accepted and selected_rev[0]:
target_rev = selected_rev[0]
comment = comment_input.text().strip()
# Confirm
reply = QtGui.QMessageBox.question(
None,
"Confirm Rollback",
f"Create new revision by rolling back to Rev {target_rev}?\n\n"
"This will copy properties and file reference from the selected revision.",
QtGui.QMessageBox.Yes | QtGui.QMessageBox.No,
)
if reply != QtGui.QMessageBox.Yes:
return
# Perform rollback
result = _client.rollback_revision(part_number, target_rev, comment)
new_rev = result["revision_number"]
FreeCAD.Console.PrintMessage(
f"Created revision {new_rev} (rollback from {target_rev})\n"
)
QtGui.QMessageBox.information(
None,
"Rollback Complete",
f"Created revision {new_rev} from rollback to Rev {target_rev}.\n\n"
"Use 'Pull' to download the rolled-back file.",
)
except Exception as e:
QtGui.QMessageBox.warning(None, "Rollback", f"Failed: {e}")
def IsActive(self):
return FreeCAD.ActiveDocument is not None
class Silo_SetStatus:
"""Set revision status (draft, review, released, obsolete)."""
def GetResources(self):
return {
"MenuText": "Set Status",
"ToolTip": "Set the status of the current revision",
"Pixmap": _icon("status"),
}
def Activated(self):
from PySide import QtGui
doc = FreeCAD.ActiveDocument
if not doc:
FreeCAD.Console.PrintError("No active document\n")
return
obj = get_tracked_object(doc)
if not obj:
FreeCAD.Console.PrintError("No tracked object\n")
return
part_number = obj.SiloPartNumber
local_rev = getattr(obj, "SiloRevision", 1)
try:
# Get current revision info
revisions = _client.get_revisions(part_number)
current_rev = revisions[0] if revisions else None
if not current_rev:
QtGui.QMessageBox.warning(None, "Set Status", "No revisions found")
return
current_status = current_rev.get("status", "draft")
rev_num = current_rev["revision_number"]
# Status selection
statuses = ["draft", "review", "released", "obsolete"]
status, ok = QtGui.QInputDialog.getItem(
None,
"Set Revision Status",
f"Set status for Rev {rev_num} (current: {current_status}):",
statuses,
statuses.index(current_status),
False,
)
if not ok or status == current_status:
return
# Update status
_client.update_revision(part_number, rev_num, status=status)
FreeCAD.Console.PrintMessage(f"Updated Rev {rev_num} status to '{status}'\n")
QtGui.QMessageBox.information(
None, "Status Updated", f"Revision {rev_num} status set to '{status}'"
)
except Exception as e:
QtGui.QMessageBox.warning(None, "Set Status", f"Failed: {e}")
def IsActive(self):
return FreeCAD.ActiveDocument is not None
class Silo_Settings:
"""Configure Silo connection settings."""
def GetResources(self):
return {
"MenuText": "Settings",
"ToolTip": "Configure Silo API URL and SSL settings",
"Pixmap": _icon("info"),
}
def Activated(self):
from PySide import QtCore, QtGui
param = FreeCAD.ParamGet(_PREF_GROUP)
dialog = QtGui.QDialog()
dialog.setWindowTitle("Silo Settings")
dialog.setMinimumWidth(450)
layout = QtGui.QVBoxLayout(dialog)
# URL
url_label = QtGui.QLabel("Silo API URL:")
layout.addWidget(url_label)
url_input = QtGui.QLineEdit()
url_input.setPlaceholderText("http://localhost:8080/api")
current_url = param.GetString("ApiUrl", "")
if current_url:
url_input.setText(current_url)
else:
env_url = os.environ.get("SILO_API_URL", "")
if env_url:
url_input.setText(env_url)
layout.addWidget(url_input)
url_hint = QtGui.QLabel(
"Full URL with path (e.g. http://localhost:8080/api) or just the "
"hostname (e.g. https://silo.kindred.internal) and /api is "
"appended automatically. Leave empty for SILO_API_URL env var."
)
url_hint.setWordWrap(True)
url_hint.setStyleSheet("color: #888; font-size: 11px;")
layout.addWidget(url_hint)
layout.addSpacing(10)
# SSL
ssl_checkbox = QtGui.QCheckBox("Verify SSL certificates")
ssl_checkbox.setChecked(param.GetBool("SslVerify", True))
layout.addWidget(ssl_checkbox)
ssl_hint = QtGui.QLabel("Disable only for internal servers with self-signed certificates.")
ssl_hint.setWordWrap(True)
ssl_hint.setStyleSheet("color: #888; font-size: 11px;")
layout.addWidget(ssl_hint)
layout.addSpacing(5)
# Custom CA certificate
cert_label = QtGui.QLabel("Custom CA certificate file:")
layout.addWidget(cert_label)
cert_row = QtGui.QHBoxLayout()
cert_input = QtGui.QLineEdit()
cert_input.setPlaceholderText("(Use system CA certificates)")
current_cert = param.GetString("SslCertPath", "")
if current_cert:
cert_input.setText(current_cert)
cert_browse = QtGui.QPushButton("Browse...")
cert_row.addWidget(cert_input)
cert_row.addWidget(cert_browse)
layout.addLayout(cert_row)
cert_hint = QtGui.QLabel(
"Path to a PEM/CRT file for internal CAs. Leave empty for system certificates only."
)
cert_hint.setWordWrap(True)
cert_hint.setStyleSheet("color: #888; font-size: 11px;")
layout.addWidget(cert_hint)
def on_browse_cert():
path, _ = QtGui.QFileDialog.getOpenFileName(
dialog,
"Select CA Certificate",
os.path.dirname(cert_input.text()) or "/etc/ssl/certs",
"Certificates (*.pem *.crt *.cer);;All Files (*)",
)
if path:
cert_input.setText(path)
cert_browse.clicked.connect(on_browse_cert)
layout.addSpacing(10)
# Authentication section
auth_heading = QtGui.QLabel("<b>Authentication</b>")
auth_heading.setTextFormat(QtCore.Qt.RichText)
layout.addWidget(auth_heading)
auth_user = _get_auth_username()
auth_role = _get_auth_role()
auth_source = _get_auth_source()
has_token = bool(_get_auth_token())
if has_token and auth_user:
auth_parts = [f"Logged in as <b>{auth_user}</b>"]
if auth_role:
auth_parts.append(f"(role: {auth_role})")
if auth_source:
auth_parts.append(f"via {auth_source}")
auth_status_text = " ".join(auth_parts)
else:
auth_status_text = "Not logged in"
auth_status_lbl = QtGui.QLabel(auth_status_text)
auth_status_lbl.setTextFormat(QtCore.Qt.RichText)
layout.addWidget(auth_status_lbl)
# API token input
token_label = QtGui.QLabel("API Token:")
layout.addWidget(token_label)
token_row = QtGui.QHBoxLayout()
token_input = QtGui.QLineEdit()
token_input.setEchoMode(QtGui.QLineEdit.Password)
token_input.setPlaceholderText("silo_... (paste token or use Login)")
current_token = param.GetString("ApiToken", "")
if current_token:
token_input.setText(current_token)
token_row.addWidget(token_input)
token_show_btn = QtGui.QToolButton()
token_show_btn.setText("\U0001f441")
token_show_btn.setCheckable(True)
token_show_btn.setFixedSize(28, 28)
token_show_btn.setToolTip("Show/hide token")
def on_toggle_show(checked):
if checked:
token_input.setEchoMode(QtGui.QLineEdit.Normal)
else:
token_input.setEchoMode(QtGui.QLineEdit.Password)
token_show_btn.toggled.connect(on_toggle_show)
token_row.addWidget(token_show_btn)
layout.addLayout(token_row)
token_hint = QtGui.QLabel(
"Paste an API token generated from the Silo web UI, "
"or use Login in the Database Auth panel to create one "
"automatically. Tokens can also be set via the "
"SILO_API_TOKEN environment variable."
)
token_hint.setWordWrap(True)
token_hint.setStyleSheet("color: #888; font-size: 11px;")
layout.addWidget(token_hint)
layout.addSpacing(4)
clear_auth_btn = QtGui.QPushButton("Clear Token and Logout")
clear_auth_btn.setEnabled(has_token)
def on_clear_auth():
_clear_auth()
token_input.setText("")
auth_status_lbl.setText("Not logged in")
clear_auth_btn.setEnabled(False)
FreeCAD.Console.PrintMessage("Silo: API token and credentials cleared\n")
clear_auth_btn.clicked.connect(on_clear_auth)
layout.addWidget(clear_auth_btn)
layout.addSpacing(10)
# Current effective values (read-only)
cert_display = param.GetString("SslCertPath", "") or "(system defaults)"
if has_token and auth_user:
auth_display = f"{auth_user} ({auth_role or 'unknown role'})"
if auth_source:
auth_display += f" via {auth_source}"
elif has_token:
auth_display = "token configured (user unknown)"
else:
auth_display = "not configured"
status_label = QtGui.QLabel(
f"<b>Active URL:</b> {_get_api_url()}<br>"
f"<b>SSL verification:</b> {'enabled' if _get_ssl_verify() else 'disabled'}<br>"
f"<b>CA certificate:</b> {cert_display}<br>"
f"<b>Authentication:</b> {auth_display}"
)
status_label.setTextFormat(QtCore.Qt.RichText)
layout.addWidget(status_label)
layout.addStretch()
# Buttons
btn_layout = QtGui.QHBoxLayout()
save_btn = QtGui.QPushButton("Save")
cancel_btn = QtGui.QPushButton("Cancel")
btn_layout.addStretch()
btn_layout.addWidget(save_btn)
btn_layout.addWidget(cancel_btn)
layout.addLayout(btn_layout)
def on_save():
url = url_input.text().strip()
param.SetString("ApiUrl", url)
param.SetBool("SslVerify", ssl_checkbox.isChecked())
cert_path = cert_input.text().strip()
param.SetString("SslCertPath", cert_path)
# Save API token if changed
new_token = token_input.text().strip()
old_token = param.GetString("ApiToken", "")
if new_token != old_token:
param.SetString("ApiToken", new_token)
if new_token and not old_token:
FreeCAD.Console.PrintMessage("Silo: API token configured\n")
elif not new_token and old_token:
_clear_auth()
FreeCAD.Console.PrintMessage("Silo: API token removed\n")
else:
FreeCAD.Console.PrintMessage("Silo: API token updated\n")
FreeCAD.Console.PrintMessage(
f"Silo settings saved. URL: {_get_api_url()}, "
f"SSL verify: {_get_ssl_verify()}, "
f"Cert: {cert_path or '(system)'}\n"
)
dialog.accept()
save_btn.clicked.connect(on_save)
cancel_btn.clicked.connect(dialog.reject)
dialog.exec_()
def IsActive(self):
return True
class Silo_BOM:
"""View and manage Bill of Materials for the current item."""
def GetResources(self):
return {
"MenuText": "BOM",
"ToolTip": "View and manage Bill of Materials",
"Pixmap": _icon("bom"),
}
def Activated(self):
from PySide import QtCore, QtGui
doc = FreeCAD.ActiveDocument
if not doc:
FreeCAD.Console.PrintError("No active document\n")
return
obj = get_tracked_object(doc)
if not obj:
FreeCAD.Console.PrintError("No tracked Silo item in active document.\n")
from PySide import QtGui as _qg
_qg.QMessageBox.warning(
None,
"BOM",
"This document is not registered with Silo.\nUse Silo > New to register it first.",
)
return
part_number = obj.SiloPartNumber
try:
item = _client.get_item(part_number)
except Exception as e:
QtGui.QMessageBox.warning(None, "BOM", f"Failed to get item info:\n{e}")
return
# Build the dialog
dialog = QtGui.QDialog()
dialog.setWindowTitle(f"BOM - {part_number}")
dialog.setMinimumWidth(750)
dialog.setMinimumHeight(450)
layout = QtGui.QVBoxLayout(dialog)
# Item header
header = QtGui.QLabel(f"<b>{part_number}</b> - {item.get('description', '')}")
layout.addWidget(header)
# Tab widget
tabs = QtGui.QTabWidget()
layout.addWidget(tabs)
# ── Tab 1: BOM (children of this item) ──
bom_widget = QtGui.QWidget()
bom_layout = QtGui.QVBoxLayout(bom_widget)
bom_table = QtGui.QTableWidget()
bom_table.setColumnCount(7)
bom_table.setHorizontalHeaderLabels(
["Part Number", "Description", "Type", "Qty", "Unit", "Ref Des", "Rev"]
)
bom_table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows)
bom_table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection)
bom_table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers)
bom_table.horizontalHeader().setStretchLastSection(True)
bom_layout.addWidget(bom_table)
# BOM button bar
bom_btn_layout = QtGui.QHBoxLayout()
add_btn = QtGui.QPushButton("Add")
edit_btn = QtGui.QPushButton("Edit")
remove_btn = QtGui.QPushButton("Remove")
bom_btn_layout.addWidget(add_btn)
bom_btn_layout.addWidget(edit_btn)
bom_btn_layout.addWidget(remove_btn)
bom_btn_layout.addStretch()
bom_layout.addLayout(bom_btn_layout)
tabs.addTab(bom_widget, "BOM")
# ── Tab 2: Where Used (parents of this item) ──
wu_widget = QtGui.QWidget()
wu_layout = QtGui.QVBoxLayout(wu_widget)
wu_table = QtGui.QTableWidget()
wu_table.setColumnCount(5)
wu_table.setHorizontalHeaderLabels(["Parent Part Number", "Type", "Qty", "Unit", "Ref Des"])
wu_table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows)
wu_table.setSelectionMode(QtGui.QAbstractItemView.SingleSelection)
wu_table.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers)
wu_table.horizontalHeader().setStretchLastSection(True)
wu_layout.addWidget(wu_table)
tabs.addTab(wu_widget, "Where Used")
# ── Data loading ──
bom_data = []
def load_bom():
nonlocal bom_data
try:
bom_data = _client.get_bom(part_number)
except Exception as exc:
FreeCAD.Console.PrintWarning(f"BOM load error: {exc}\n")
bom_data = []
bom_table.setRowCount(len(bom_data))
for row, entry in enumerate(bom_data):
bom_table.setItem(
row, 0, QtGui.QTableWidgetItem(entry.get("child_part_number", ""))
)
bom_table.setItem(
row, 1, QtGui.QTableWidgetItem(entry.get("child_description", ""))
)
bom_table.setItem(row, 2, QtGui.QTableWidgetItem(entry.get("rel_type", "")))
qty = entry.get("quantity")
bom_table.setItem(
row, 3, QtGui.QTableWidgetItem(str(qty) if qty is not None else "")
)
bom_table.setItem(row, 4, QtGui.QTableWidgetItem(entry.get("unit") or ""))
ref_des = entry.get("reference_designators") or []
bom_table.setItem(row, 5, QtGui.QTableWidgetItem(", ".join(ref_des)))
bom_table.setItem(
row,
6,
QtGui.QTableWidgetItem(str(entry.get("effective_revision", ""))),
)
bom_table.resizeColumnsToContents()
def load_where_used():
try:
wu_data = _client.get_bom_where_used(part_number)
except Exception as exc:
FreeCAD.Console.PrintWarning(f"Where-used load error: {exc}\n")
wu_data = []
wu_table.setRowCount(len(wu_data))
for row, entry in enumerate(wu_data):
wu_table.setItem(
row, 0, QtGui.QTableWidgetItem(entry.get("parent_part_number", ""))
)
wu_table.setItem(row, 1, QtGui.QTableWidgetItem(entry.get("rel_type", "")))
qty = entry.get("quantity")
wu_table.setItem(
row, 2, QtGui.QTableWidgetItem(str(qty) if qty is not None else "")
)
wu_table.setItem(row, 3, QtGui.QTableWidgetItem(entry.get("unit") or ""))
ref_des = entry.get("reference_designators") or []
wu_table.setItem(row, 4, QtGui.QTableWidgetItem(", ".join(ref_des)))
wu_table.resizeColumnsToContents()
# ── Button handlers ──
def on_add():
add_dlg = QtGui.QDialog(dialog)
add_dlg.setWindowTitle("Add BOM Entry")
add_dlg.setMinimumWidth(400)
al = QtGui.QFormLayout(add_dlg)
child_input = QtGui.QLineEdit()
child_input.setPlaceholderText("e.g. F01-0001")
al.addRow("Child Part Number:", child_input)
type_combo = QtGui.QComboBox()
type_combo.addItems(["component", "alternate", "reference"])
al.addRow("Relationship Type:", type_combo)
qty_input = QtGui.QLineEdit()
qty_input.setPlaceholderText("e.g. 4")
al.addRow("Quantity:", qty_input)
unit_input = QtGui.QLineEdit()
unit_input.setPlaceholderText("e.g. ea, m, kg")
al.addRow("Unit:", unit_input)
refdes_input = QtGui.QLineEdit()
refdes_input.setPlaceholderText("e.g. R1, R2, R3")
al.addRow("Ref Designators:", refdes_input)
btn_box = QtGui.QDialogButtonBox(
QtGui.QDialogButtonBox.Ok | QtGui.QDialogButtonBox.Cancel
)
btn_box.accepted.connect(add_dlg.accept)
btn_box.rejected.connect(add_dlg.reject)
al.addRow(btn_box)
if add_dlg.exec_() != QtGui.QDialog.Accepted:
return
child_pn = child_input.text().strip()
if not child_pn:
return
qty = None
qty_text = qty_input.text().strip()
if qty_text:
try:
qty = float(qty_text)
except ValueError:
QtGui.QMessageBox.warning(dialog, "BOM", "Quantity must be a number.")
return
unit = unit_input.text().strip() or None
rel_type = type_combo.currentText()
ref_des = None
refdes_text = refdes_input.text().strip()
if refdes_text:
ref_des = [r.strip() for r in refdes_text.split(",") if r.strip()]
try:
_client.add_bom_entry(
part_number,
child_pn,
quantity=qty,
unit=unit,
rel_type=rel_type,
ref_des=ref_des,
)
load_bom()
except Exception as exc:
QtGui.QMessageBox.warning(dialog, "BOM", f"Failed to add entry:\n{exc}")
def on_edit():
selected = bom_table.selectedItems()
if not selected:
return
row = selected[0].row()
if row < 0 or row >= len(bom_data):
return
entry = bom_data[row]
child_pn = entry.get("child_part_number", "")
edit_dlg = QtGui.QDialog(dialog)
edit_dlg.setWindowTitle(f"Edit BOM Entry - {child_pn}")
edit_dlg.setMinimumWidth(400)
el = QtGui.QFormLayout(edit_dlg)
type_combo = QtGui.QComboBox()
type_combo.addItems(["component", "alternate", "reference"])
current_type = entry.get("rel_type", "component")
idx = type_combo.findText(current_type)
if idx >= 0:
type_combo.setCurrentIndex(idx)
el.addRow("Relationship Type:", type_combo)
qty_input = QtGui.QLineEdit()
qty = entry.get("quantity")
if qty is not None:
qty_input.setText(str(qty))
el.addRow("Quantity:", qty_input)
unit_input = QtGui.QLineEdit()
unit_input.setText(entry.get("unit") or "")
el.addRow("Unit:", unit_input)
refdes_input = QtGui.QLineEdit()
ref_des = entry.get("reference_designators") or []
refdes_input.setText(", ".join(ref_des))
el.addRow("Ref Designators:", refdes_input)
btn_box = QtGui.QDialogButtonBox(
QtGui.QDialogButtonBox.Ok | QtGui.QDialogButtonBox.Cancel
)
btn_box.accepted.connect(edit_dlg.accept)
btn_box.rejected.connect(edit_dlg.reject)
el.addRow(btn_box)
if edit_dlg.exec_() != QtGui.QDialog.Accepted:
return
new_qty = None
qty_text = qty_input.text().strip()
if qty_text:
try:
new_qty = float(qty_text)
except ValueError:
QtGui.QMessageBox.warning(dialog, "BOM", "Quantity must be a number.")
return
new_unit = unit_input.text().strip() or None
new_type = type_combo.currentText()
new_ref_des = None
refdes_text = refdes_input.text().strip()
if refdes_text:
new_ref_des = [r.strip() for r in refdes_text.split(",") if r.strip()]
else:
new_ref_des = []
try:
_client.update_bom_entry(
part_number,
child_pn,
quantity=new_qty,
unit=new_unit,
rel_type=new_type,
ref_des=new_ref_des,
)
load_bom()
except Exception as exc:
QtGui.QMessageBox.warning(dialog, "BOM", f"Failed to update entry:\n{exc}")
def on_remove():
selected = bom_table.selectedItems()
if not selected:
return
row = selected[0].row()
if row < 0 or row >= len(bom_data):
return
entry = bom_data[row]
child_pn = entry.get("child_part_number", "")
reply = QtGui.QMessageBox.question(
dialog,
"Remove BOM Entry",
f"Remove {child_pn} from BOM?",
QtGui.QMessageBox.Yes | QtGui.QMessageBox.No,
)
if reply != QtGui.QMessageBox.Yes:
return
try:
_client.delete_bom_entry(part_number, child_pn)
load_bom()
except Exception as exc:
QtGui.QMessageBox.warning(dialog, "BOM", f"Failed to remove entry:\n{exc}")
add_btn.clicked.connect(on_add)
edit_btn.clicked.connect(on_edit)
remove_btn.clicked.connect(on_remove)
# Close button
close_layout = QtGui.QHBoxLayout()
close_layout.addStretch()
close_btn = QtGui.QPushButton("Close")
close_btn.clicked.connect(dialog.accept)
close_layout.addWidget(close_btn)
layout.addLayout(close_layout)
# Initial data load
load_bom()
load_where_used()
dialog.exec_()
def IsActive(self):
return FreeCAD.ActiveDocument is not None
# ---------------------------------------------------------------------------
# Silo Mode toggle - swap Ctrl+O/S/N between standard and Silo commands
# ---------------------------------------------------------------------------
# Stored original shortcuts so they can be restored on toggle-off
_original_shortcuts: Dict[str, Any] = {}
def _swap_shortcuts(mapping, enable_silo):
"""Swap keyboard shortcuts between standard and Silo commands.
mapping: list of (std_cmd, silo_cmd, shortcut) tuples
enable_silo: True to assign shortcuts to Silo commands, False to restore.
"""
from PySide import QtGui
mw = FreeCADGui.getMainWindow()
if mw is None:
return
for std_cmd, silo_cmd, shortcut in mapping:
if enable_silo:
# Save and clear the standard command's shortcut
std_action = mw.findChild(QtGui.QAction, std_cmd)
if std_action:
_original_shortcuts[std_cmd] = std_action.shortcut().toString()
std_action.setShortcut("")
# Assign the shortcut to the Silo command
silo_action = mw.findChild(QtGui.QAction, silo_cmd)
if silo_action:
silo_action.setShortcut(shortcut)
else:
# Clear the Silo command's shortcut
silo_action = mw.findChild(QtGui.QAction, silo_cmd)
if silo_action:
silo_action.setShortcut("")
# Restore the standard command's original shortcut
std_action = mw.findChild(QtGui.QAction, std_cmd)
if std_action and std_cmd in _original_shortcuts:
std_action.setShortcut(_original_shortcuts.pop(std_cmd))
_SHORTCUT_MAP = [
("Std_Open", "Silo_Open", "Ctrl+O"),
("Std_Save", "Silo_Save", "Ctrl+S"),
("Std_New", "Silo_New", "Ctrl+N"),
]
class Silo_ToggleMode:
"""Toggle between standard file operations and Silo equivalents."""
def GetResources(self):
return {
"MenuText": "Silo Mode",
"ToolTip": (
"Toggle between standard file operations and Silo equivalents.\n"
"When ON: Ctrl+O/S/N use Silo Open/Save/New.\n"
"When OFF: Standard FreeCAD file operations."
),
"Pixmap": _icon("silo"),
"Checkable": True,
}
def Activated(self, checked):
param = FreeCAD.ParamGet(_PREF_GROUP)
if checked:
_swap_shortcuts(_SHORTCUT_MAP, enable_silo=True)
param.SetBool("SiloMode", True)
FreeCAD.Console.PrintMessage("Silo mode enabled\n")
else:
_swap_shortcuts(_SHORTCUT_MAP, enable_silo=False)
param.SetBool("SiloMode", False)
FreeCAD.Console.PrintMessage("Silo mode disabled\n")
def IsActive(self):
return True
# ---------------------------------------------------------------------------
# SSE live-update listener
# ---------------------------------------------------------------------------
class SiloEventListener(QtCore.QThread):
"""Background thread that listens to Server-Sent Events from the Silo API.
Emits Qt signals when items are updated or new revisions are created.
Degrades gracefully if the server does not support the ``/api/events``
endpoint.
"""
item_updated = QtCore.Signal(str) # part_number
revision_created = QtCore.Signal(str, int) # part_number, revision
connection_status = QtCore.Signal(str) # "connected" / "disconnected" / "unsupported"
_MAX_FAST_RETRIES = 3
_FAST_RETRY_SECS = 5
_SLOW_RETRY_SECS = 30
def __init__(self, parent=None):
super().__init__(parent)
self._stop_flag = False
self._response = None
# -- public API ---------------------------------------------------------
def stop(self):
self._stop_flag = True
# Close the socket so readline() unblocks immediately
try:
if self._response is not None:
self._response.close()
except Exception:
pass
self.wait(5000)
# -- thread entry -------------------------------------------------------
def run(self):
retries = 0
while not self._stop_flag:
try:
self._listen()
# _listen returns normally only on clean EOF / stop
if self._stop_flag:
return
retries += 1
except _SSEUnsupported:
self.connection_status.emit("unsupported")
return
except Exception:
retries += 1
self.connection_status.emit("disconnected")
if retries <= self._MAX_FAST_RETRIES:
delay = self._FAST_RETRY_SECS
else:
delay = self._SLOW_RETRY_SECS
# Interruptible sleep
for _ in range(delay):
if self._stop_flag:
return
self.msleep(1000)
# -- SSE stream reader --------------------------------------------------
def _listen(self):
url = f"{_get_api_url().rstrip('/')}/api/events"
headers = {"Accept": "text/event-stream"}
headers.update(_get_auth_headers())
req = urllib.request.Request(url, headers=headers, method="GET")
try:
self._response = urllib.request.urlopen(req, context=_get_ssl_context(), timeout=90)
except urllib.error.HTTPError as e:
if e.code in (404, 501):
raise _SSEUnsupported()
raise
except urllib.error.URLError:
raise
self.connection_status.emit("connected")
event_type = ""
data_buf = ""
for raw_line in self._response:
if self._stop_flag:
return
line = raw_line.decode("utf-8", errors="replace").rstrip("\r\n")
if line == "":
# Blank line = dispatch event
if data_buf:
self._dispatch(event_type or "message", data_buf.strip())
event_type = ""
data_buf = ""
elif line.startswith("event:"):
event_type = line[6:].strip()
elif line.startswith("data:"):
data_buf += line[5:].strip() + "\n"
# Ignore comments (lines starting with ':') and other fields
def _dispatch(self, event_type, data):
try:
payload = json.loads(data)
except (json.JSONDecodeError, ValueError):
return
pn = payload.get("part_number", "")
if not pn:
return
if event_type in ("item_updated", "message"):
self.item_updated.emit(pn)
elif event_type == "revision_created":
rev = payload.get("revision", 0)
self.revision_created.emit(pn, int(rev))
class _SSEUnsupported(Exception):
"""Raised when the server does not support the SSE endpoint."""
# ---------------------------------------------------------------------------
# Auth dock widget
# ---------------------------------------------------------------------------
class SiloAuthDockWidget:
"""Content widget for the Silo Database Auth dock panel."""
def __init__(self):
from PySide import QtCore, QtGui
self.widget = QtGui.QWidget()
self._event_listener = None
self._build_ui()
self._refresh_status()
self._timer = QtCore.QTimer(self.widget)
self._timer.timeout.connect(self._refresh_status)
self._timer.start(30000)
# -- UI construction ----------------------------------------------------
def _build_ui(self):
from PySide import QtCore, QtGui
layout = QtGui.QVBoxLayout(self.widget)
layout.setContentsMargins(8, 8, 8, 8)
layout.setSpacing(6)
# Status row
status_row = QtGui.QHBoxLayout()
status_row.setSpacing(6)
self._status_dot = QtGui.QLabel("\u2b24")
self._status_dot.setFixedWidth(16)
self._status_dot.setAlignment(QtCore.Qt.AlignCenter)
self._status_label = QtGui.QLabel("Checking...")
status_row.addWidget(self._status_dot)
status_row.addWidget(self._status_label)
status_row.addStretch()
layout.addLayout(status_row)
# User row
user_row = QtGui.QHBoxLayout()
user_row.setSpacing(6)
user_lbl = QtGui.QLabel("User:")
user_lbl.setStyleSheet("color: #888;")
self._user_label = QtGui.QLabel("(not logged in)")
user_row.addWidget(user_lbl)
user_row.addWidget(self._user_label)
user_row.addStretch()
layout.addLayout(user_row)
# Role row
role_row = QtGui.QHBoxLayout()
role_row.setSpacing(6)
role_lbl = QtGui.QLabel("Role:")
role_lbl.setStyleSheet("color: #888;")
self._role_label = QtGui.QLabel("")
self._role_label.setStyleSheet("font-size: 11px;")
role_row.addWidget(role_lbl)
role_row.addWidget(self._role_label)
role_row.addStretch()
layout.addLayout(role_row)
layout.addSpacing(4)
# URL row (compact display)
url_row = QtGui.QHBoxLayout()
url_row.setSpacing(6)
url_lbl = QtGui.QLabel("URL:")
url_lbl.setStyleSheet("color: #888;")
self._url_label = QtGui.QLabel("")
self._url_label.setStyleSheet("font-size: 11px;")
self._url_label.setWordWrap(True)
url_row.addWidget(url_lbl)
url_row.addWidget(self._url_label, 1)
layout.addLayout(url_row)
layout.addSpacing(4)
# Live updates row
sse_row = QtGui.QHBoxLayout()
sse_row.setSpacing(6)
sse_lbl = QtGui.QLabel("Live:")
sse_lbl.setStyleSheet("color: #888;")
self._sse_label = QtGui.QLabel("")
self._sse_label.setStyleSheet("font-size: 11px;")
sse_row.addWidget(sse_lbl)
sse_row.addWidget(self._sse_label)
sse_row.addStretch()
layout.addLayout(sse_row)
layout.addSpacing(4)
# Buttons
btn_row = QtGui.QHBoxLayout()
btn_row.setSpacing(6)
self._login_btn = QtGui.QPushButton("Login")
self._login_btn.clicked.connect(self._on_login_clicked)
btn_row.addWidget(self._login_btn)
settings_btn = QtGui.QToolButton()
settings_btn.setText("\u2699")
settings_btn.setToolTip("Silo Settings")
settings_btn.setFixedSize(28, 28)
settings_btn.clicked.connect(self._on_settings_clicked)
btn_row.addStretch()
btn_row.addWidget(settings_btn)
layout.addLayout(btn_row)
layout.addStretch()
# -- Status refresh -----------------------------------------------------
def _refresh_status(self):
from PySide import QtGui
# Update URL display
self._url_label.setText(_get_api_url())
has_token = _client.is_authenticated()
username = _client.auth_username()
role = _client.auth_role()
source = _client.auth_source()
# Check server connectivity
try:
reachable, msg = _client.check_connection()
except Exception:
reachable = False
# If reachable and we have a token, validate it against the server
authed = False
if reachable and has_token:
user = _client.get_current_user()
if user and user.get("username"):
authed = True
username = user["username"]
role = user.get("role", "")
source = user.get("auth_source", "")
_save_auth_info(username=username, role=role, source=source)
if authed:
self._user_label.setText(username)
role_text = role or ""
if source:
role_text += f" ({source})" if role_text else source
self._role_label.setText(role_text)
else:
self._user_label.setText("(not logged in)")
self._role_label.setText("")
# Update button state
try:
self._login_btn.clicked.disconnect()
except RuntimeError:
pass
if reachable and authed:
self._status_dot.setStyleSheet("color: #4CAF50; font-size: 10px;")
self._status_label.setText("Connected")
self._login_btn.setText("Logout")
self._login_btn.clicked.connect(self._on_logout_clicked)
elif reachable and has_token and not authed:
# Token exists but is invalid/expired
self._status_dot.setStyleSheet("color: #FF9800; font-size: 10px;")
self._status_label.setText("Token invalid")
self._login_btn.setText("Login")
self._login_btn.clicked.connect(self._on_login_clicked)
elif reachable and not has_token:
self._status_dot.setStyleSheet("color: #FFC107; font-size: 10px;")
self._status_label.setText("Connected (no auth)")
self._login_btn.setText("Login")
self._login_btn.clicked.connect(self._on_login_clicked)
else:
self._status_dot.setStyleSheet("color: #F44336; font-size: 10px;")
self._status_label.setText("Disconnected")
self._login_btn.setText("Login")
self._login_btn.clicked.connect(self._on_login_clicked)
# Manage SSE listener based on auth state
self._sync_event_listener(authed)
# -- SSE listener management --------------------------------------------
def _sync_event_listener(self, authed):
"""Start or stop the SSE listener depending on authentication state."""
if authed:
if self._event_listener is None or not self._event_listener.isRunning():
self._event_listener = SiloEventListener()
self._event_listener.item_updated.connect(self._on_remote_change)
self._event_listener.revision_created.connect(self._on_remote_revision)
self._event_listener.connection_status.connect(self._on_sse_status)
self._event_listener.start()
else:
if self._event_listener is not None and self._event_listener.isRunning():
self._event_listener.stop()
self._sse_label.setText("")
def _on_sse_status(self, status):
if status == "connected":
self._sse_label.setText("Listening")
self._sse_label.setStyleSheet("font-size: 11px; color: #4CAF50;")
elif status == "disconnected":
self._sse_label.setText("Reconnecting...")
self._sse_label.setStyleSheet("font-size: 11px; color: #FF9800;")
elif status == "unsupported":
self._sse_label.setText("Not available")
self._sse_label.setStyleSheet("font-size: 11px; color: #888;")
def _on_remote_change(self, part_number):
FreeCAD.Console.PrintMessage(f"Silo: Part {part_number} updated on server\n")
mw = FreeCADGui.getMainWindow()
if mw is not None:
mw.statusBar().showMessage(f"Silo: {part_number} updated on server", 5000)
self._refresh_activity_panel()
def _on_remote_revision(self, part_number, revision):
FreeCAD.Console.PrintMessage(f"Silo: New revision {revision} for {part_number}\n")
mw = FreeCADGui.getMainWindow()
if mw is not None:
mw.statusBar().showMessage(f"Silo: {part_number} rev {revision} available", 5000)
self._refresh_activity_panel()
def _refresh_activity_panel(self):
"""Refresh the Database Activity panel if it exists."""
from PySide import QtWidgets
mw = FreeCADGui.getMainWindow()
if mw is None:
return
panel = mw.findChild(QtWidgets.QDockWidget, "SiloDatabaseActivity")
if panel is None:
return
activity_list = panel.findChild(QtWidgets.QListWidget)
if activity_list is None:
return
activity_list.clear()
try:
items = _client.list_items()
if isinstance(items, list):
for item in items[:20]:
pn = item.get("part_number", "")
desc = item.get("description", "")
updated = item.get("updated_at", "")
if updated:
updated = updated[:10]
activity_list.addItem(f"{pn} - {desc} - {updated}")
if activity_list.count() == 0:
activity_list.addItem("(No items in database)")
except Exception:
activity_list.addItem("(Unable to refresh activity)")
# -- Actions ------------------------------------------------------------
def _on_login_clicked(self):
self._show_login_dialog()
def _on_logout_clicked(self):
_client.logout()
FreeCAD.Console.PrintMessage("Silo: Logged out\n")
self._refresh_status()
def _on_settings_clicked(self):
FreeCADGui.runCommand("Silo_Settings")
# Refresh after settings may have changed
self._refresh_status()
def _show_login_dialog(self):
from PySide import QtCore, QtGui
dialog = QtGui.QDialog(self.widget)
dialog.setWindowTitle("Silo Login")
dialog.setMinimumWidth(380)
layout = QtGui.QVBoxLayout(dialog)
# Server info
server_label = QtGui.QLabel(f"Server: {_get_api_url()}")
server_label.setStyleSheet("color: #888; font-size: 11px;")
layout.addWidget(server_label)
layout.addSpacing(4)
info_label = QtGui.QLabel(
"Enter your credentials to create a persistent API token. "
"Supports local accounts and LDAP (FreeIPA)."
)
info_label.setWordWrap(True)
info_label.setStyleSheet("color: #888; font-size: 11px;")
layout.addWidget(info_label)
layout.addSpacing(8)
# Username
user_label = QtGui.QLabel("Username:")
layout.addWidget(user_label)
user_input = QtGui.QLineEdit()
user_input.setPlaceholderText("Username")
last_user = _get_auth_username()
if last_user:
user_input.setText(last_user)
layout.addWidget(user_input)
layout.addSpacing(4)
# Password
pass_label = QtGui.QLabel("Password:")
layout.addWidget(pass_label)
pass_input = QtGui.QLineEdit()
pass_input.setEchoMode(QtGui.QLineEdit.Password)
pass_input.setPlaceholderText("Password")
layout.addWidget(pass_input)
layout.addSpacing(4)
# Error / status label
status_label = QtGui.QLabel("")
status_label.setWordWrap(True)
status_label.setVisible(False)
layout.addWidget(status_label)
layout.addSpacing(8)
# Buttons
btn_layout = QtGui.QHBoxLayout()
login_btn = QtGui.QPushButton("Login")
cancel_btn = QtGui.QPushButton("Cancel")
btn_layout.addStretch()
btn_layout.addWidget(login_btn)
btn_layout.addWidget(cancel_btn)
layout.addLayout(btn_layout)
def on_login():
username = user_input.text().strip()
password = pass_input.text()
if not username or not password:
status_label.setText("Username and password are required.")
status_label.setStyleSheet("color: #F44336;")
status_label.setVisible(True)
return
# Disable inputs during login
login_btn.setEnabled(False)
status_label.setText("Logging in...")
status_label.setStyleSheet("color: #888;")
status_label.setVisible(True)
# Process events so the user sees the status update
from PySide.QtWidgets import QApplication
QApplication.processEvents()
try:
result = _client.login(username, password)
role = result.get("role", "")
source = result.get("auth_source", "")
msg = f"Silo: Logged in as {username}"
if role:
msg += f" ({role})"
if source:
msg += f" via {source}"
FreeCAD.Console.PrintMessage(msg + "\n")
dialog.accept()
except RuntimeError as e:
status_label.setText(str(e))
status_label.setStyleSheet("color: #F44336;")
status_label.setVisible(True)
login_btn.setEnabled(True)
login_btn.clicked.connect(on_login)
cancel_btn.clicked.connect(dialog.reject)
pass_input.returnPressed.connect(on_login)
user_input.returnPressed.connect(lambda: pass_input.setFocus())
dialog.exec_()
self._refresh_status()
class Silo_Auth:
"""Show the Silo authentication panel."""
def GetResources(self):
return {
"MenuText": "Authentication",
"ToolTip": "Show Silo authentication status and login",
"Pixmap": _icon("auth"),
}
def Activated(self):
from PySide import QtGui
mw = FreeCADGui.getMainWindow()
if mw is None:
return
panel = mw.findChild(QtGui.QDockWidget, "SiloDatabaseAuth")
if panel:
panel.show()
panel.raise_()
def IsActive(self):
return True
# Register commands
FreeCADGui.addCommand("Silo_Open", Silo_Open())
FreeCADGui.addCommand("Silo_New", Silo_New())
FreeCADGui.addCommand("Silo_Save", Silo_Save())
FreeCADGui.addCommand("Silo_Commit", Silo_Commit())
FreeCADGui.addCommand("Silo_Pull", Silo_Pull())
FreeCADGui.addCommand("Silo_Push", Silo_Push())
FreeCADGui.addCommand("Silo_Info", Silo_Info())
FreeCADGui.addCommand("Silo_BOM", Silo_BOM())
FreeCADGui.addCommand("Silo_TagProjects", Silo_TagProjects())
FreeCADGui.addCommand("Silo_Rollback", Silo_Rollback())
FreeCADGui.addCommand("Silo_SetStatus", Silo_SetStatus())
FreeCADGui.addCommand("Silo_Settings", Silo_Settings())
FreeCADGui.addCommand("Silo_ToggleMode", Silo_ToggleMode())
FreeCADGui.addCommand("Silo_Auth", Silo_Auth())