"""Silo API client for LibreOffice Calc extension. Adapted from pkg/freecad/silo_commands.py SiloClient. Uses urllib (no external dependencies) and the same auth flow: session login to obtain a persistent API token stored in a local JSON settings file. """ import http.cookiejar import json import os import socket import ssl import urllib.error import urllib.parse import urllib.request from typing import Any, Dict, List, Optional, Tuple from . import settings as _settings # --------------------------------------------------------------------------- # SSL helpers # --------------------------------------------------------------------------- def _get_ssl_context() -> ssl.SSLContext: """Build an SSL context honouring the user's verify/cert preferences.""" cfg = _settings.load() if not cfg.get("ssl_verify", True): ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx ctx = ssl.create_default_context() custom_cert = cfg.get("ssl_cert_path", "") if custom_cert and os.path.isfile(custom_cert): try: ctx.load_verify_locations(custom_cert) except Exception: pass # Load system CA bundles (bundled Python may not find them automatically) for ca_path in ( "/etc/ssl/certs/ca-certificates.crt", "/etc/pki/tls/certs/ca-bundle.crt", ): if os.path.isfile(ca_path): try: ctx.load_verify_locations(ca_path) except Exception: pass break return ctx # --------------------------------------------------------------------------- # SiloClient # --------------------------------------------------------------------------- class SiloClient: """HTTP client for the Silo REST API.""" def __init__(self, base_url: str = None): self._explicit_url = base_url # -- URL helpers -------------------------------------------------------- @property def base_url(self) -> str: if self._explicit_url: return self._explicit_url.rstrip("/") cfg = _settings.load() url = cfg.get("api_url", "").rstrip("/") if not url: url = os.environ.get("SILO_API_URL", "http://localhost:8080/api") # Auto-append /api for bare origins parsed = urllib.parse.urlparse(url) if not parsed.path or parsed.path == "/": url = url + "/api" return url @property def _origin(self) -> str: """Server origin (without /api) for auth endpoints.""" base = self.base_url return base.rsplit("/api", 1)[0] if base.endswith("/api") else base # -- Auth headers ------------------------------------------------------- def _auth_headers(self) -> Dict[str, str]: token = _settings.load().get("api_token", "") or os.environ.get( "SILO_API_TOKEN", "" ) if token: return {"Authorization": f"Bearer {token}"} return {} # -- Core HTTP ---------------------------------------------------------- def _request( self, method: str, path: str, data: Optional[Dict] = None, raw: bool = False, ) -> Any: """Make an authenticated JSON request. Returns parsed JSON. If *raw* is True the response bytes are returned instead. """ url = f"{self.base_url}{path}" headers = {"Content-Type": "application/json"} headers.update(self._auth_headers()) body = json.dumps(data).encode() if data else None req = urllib.request.Request(url, data=body, headers=headers, method=method) try: with urllib.request.urlopen(req, context=_get_ssl_context()) as resp: payload = resp.read() if raw: return payload return json.loads(payload.decode()) except urllib.error.HTTPError as e: if e.code == 401: _settings.clear_auth() error_body = e.read().decode() raise RuntimeError(f"API error {e.code}: {error_body}") except urllib.error.URLError as e: raise RuntimeError(f"Connection error: {e.reason}") def _download(self, path: str) -> bytes: """Download raw bytes from an API path.""" url = f"{self.base_url}{path}" req = urllib.request.Request(url, headers=self._auth_headers(), method="GET") try: with urllib.request.urlopen(req, context=_get_ssl_context()) as resp: return resp.read() except urllib.error.HTTPError as e: raise RuntimeError(f"Download error {e.code}: {e.read().decode()}") except urllib.error.URLError as e: raise RuntimeError(f"Connection error: {e.reason}") def _upload_ods( self, path: str, ods_bytes: bytes, filename: str = "upload.ods" ) -> Any: """POST an ODS file as multipart/form-data.""" boundary = "----SiloCalcUpload" + str(abs(hash(filename)))[-8:] parts = [] parts.append( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n' f"Content-Type: application/vnd.oasis.opendocument.spreadsheet\r\n\r\n" ) parts.append(ods_bytes) parts.append(f"\r\n--{boundary}--\r\n") body = b"" for p in parts: body += p.encode("utf-8") if isinstance(p, str) else p url = f"{self.base_url}{path}" headers = { "Content-Type": f"multipart/form-data; boundary={boundary}", "Content-Length": str(len(body)), } headers.update(self._auth_headers()) req = urllib.request.Request(url, data=body, headers=headers, method="POST") try: with urllib.request.urlopen(req, context=_get_ssl_context()) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}") except urllib.error.URLError as e: raise RuntimeError(f"Connection error: {e.reason}") # -- Authentication ----------------------------------------------------- def login(self, username: str, password: str) -> Dict[str, Any]: """Session login + create persistent API token (same flow as FreeCAD).""" ctx = _get_ssl_context() cookie_jar = http.cookiejar.CookieJar() opener = urllib.request.build_opener( urllib.request.HTTPCookieProcessor(cookie_jar), urllib.request.HTTPSHandler(context=ctx), ) # Step 1: POST credentials to /login login_url = f"{self._origin}/login" form_data = urllib.parse.urlencode( {"username": username, "password": password} ).encode() req = urllib.request.Request( login_url, data=form_data, method="POST", headers={"Content-Type": "application/x-www-form-urlencoded"}, ) try: opener.open(req) except urllib.error.HTTPError as e: if e.code not in (302, 303): raise RuntimeError(f"Login failed (HTTP {e.code})") except urllib.error.URLError as e: raise RuntimeError(f"Connection error: {e.reason}") # Step 2: Verify via /api/auth/me me_req = urllib.request.Request(f"{self._origin}/api/auth/me", method="GET") try: with opener.open(me_req) as resp: user_info = json.loads(resp.read().decode()) except urllib.error.HTTPError as e: if e.code == 401: raise RuntimeError("Login failed: invalid username or password") raise RuntimeError(f"Login verification failed (HTTP {e.code})") # Step 3: Create API token hostname = socket.gethostname() token_body = json.dumps( {"name": f"LibreOffice Calc ({hostname})", "expires_in_days": 90} ).encode() token_req = urllib.request.Request( f"{self._origin}/api/auth/tokens", data=token_body, method="POST", headers={"Content-Type": "application/json"}, ) try: with opener.open(token_req) as resp: token_result = json.loads(resp.read().decode()) except urllib.error.HTTPError as e: raise RuntimeError(f"Failed to create API token (HTTP {e.code})") raw_token = token_result.get("token", "") if not raw_token: raise RuntimeError("Server did not return an API token") _settings.save_auth( username=user_info.get("username", username), role=user_info.get("role", ""), source=user_info.get("auth_source", ""), token=raw_token, ) return { "username": user_info.get("username", username), "role": user_info.get("role", ""), "auth_source": user_info.get("auth_source", ""), "token_name": token_result.get("name", ""), } def logout(self): _settings.clear_auth() def is_authenticated(self) -> bool: cfg = _settings.load() return bool(cfg.get("api_token") or os.environ.get("SILO_API_TOKEN")) def get_current_user(self) -> Optional[Dict[str, Any]]: try: return self._request("GET", "/auth/me") except RuntimeError: return None def check_connection(self) -> Tuple[bool, str]: url = f"{self._origin}/health" req = urllib.request.Request(url, method="GET") try: with urllib.request.urlopen( req, context=_get_ssl_context(), timeout=5 ) as resp: return True, f"OK ({resp.status})" except urllib.error.HTTPError as e: return True, f"Server error ({e.code})" except urllib.error.URLError as e: return False, str(e.reason) except Exception as e: return False, str(e) # -- Items -------------------------------------------------------------- def get_item(self, part_number: str) -> Dict[str, Any]: return self._request( "GET", f"/items/{urllib.parse.quote(part_number, safe='')}" ) def list_items(self, search: str = "", project: str = "", limit: int = 100) -> list: params = [f"limit={limit}"] if search: params.append(f"search={urllib.parse.quote(search)}") if project: params.append(f"project={urllib.parse.quote(project)}") return self._request("GET", "/items?" + "&".join(params)) def create_item( self, schema: str, category: str, description: str = "", projects: Optional[List[str]] = None, sourcing_type: str = "", sourcing_link: str = "", standard_cost: Optional[float] = None, long_description: str = "", ) -> Dict[str, Any]: data: Dict[str, Any] = { "schema": schema, "category": category, "description": description, } if projects: data["projects"] = projects if sourcing_type: data["sourcing_type"] = sourcing_type if sourcing_link: data["sourcing_link"] = sourcing_link if standard_cost is not None: data["standard_cost"] = standard_cost if long_description: data["long_description"] = long_description return self._request("POST", "/items", data) def update_item(self, part_number: str, **fields) -> Dict[str, Any]: return self._request( "PUT", f"/items/{urllib.parse.quote(part_number, safe='')}", fields ) # -- Projects ----------------------------------------------------------- def get_projects(self) -> list: return self._request("GET", "/projects") def get_project_items(self, code: str) -> list: return self._request( "GET", f"/projects/{urllib.parse.quote(code, safe='')}/items" ) def add_item_projects( self, part_number: str, project_codes: List[str] ) -> Dict[str, Any]: return self._request( "POST", f"/items/{urllib.parse.quote(part_number, safe='')}/projects", {"projects": project_codes}, ) def get_item_projects(self, part_number: str) -> list: return self._request( "GET", f"/items/{urllib.parse.quote(part_number, safe='')}/projects" ) # -- Schemas ------------------------------------------------------------ def get_schema(self, name: str = "kindred-rd") -> Dict[str, Any]: return self._request("GET", f"/schemas/{urllib.parse.quote(name, safe='')}") def get_property_schema(self, name: str = "kindred-rd") -> Dict[str, Any]: return self._request( "GET", f"/schemas/{urllib.parse.quote(name, safe='')}/properties" ) # -- BOM ---------------------------------------------------------------- def get_bom(self, part_number: str) -> list: return self._request( "GET", f"/items/{urllib.parse.quote(part_number, safe='')}/bom" ) def get_bom_expanded(self, part_number: str, depth: int = 10) -> list: return self._request( "GET", f"/items/{urllib.parse.quote(part_number, safe='')}/bom/expanded?depth={depth}", ) def add_bom_entry( self, parent_pn: str, child_pn: str, quantity: Optional[float] = None, rel_type: str = "component", metadata: Optional[Dict] = None, ) -> Dict[str, Any]: data: Dict[str, Any] = { "child_part_number": child_pn, "rel_type": rel_type, } if quantity is not None: data["quantity"] = quantity if metadata: data["metadata"] = metadata return self._request( "POST", f"/items/{urllib.parse.quote(parent_pn, safe='')}/bom", data ) def update_bom_entry( self, parent_pn: str, child_pn: str, quantity: Optional[float] = None, metadata: Optional[Dict] = None, ) -> Dict[str, Any]: data: Dict[str, Any] = {} if quantity is not None: data["quantity"] = quantity if metadata: data["metadata"] = metadata return self._request( "PUT", f"/items/{urllib.parse.quote(parent_pn, safe='')}/bom/{urllib.parse.quote(child_pn, safe='')}", data, ) # -- Revisions ---------------------------------------------------------- def get_revisions(self, part_number: str) -> list: return self._request( "GET", f"/items/{urllib.parse.quote(part_number, safe='')}/revisions" ) def get_revision(self, part_number: str, revision: int) -> Dict[str, Any]: return self._request( "GET", f"/items/{urllib.parse.quote(part_number, safe='')}/revisions/{revision}", ) # -- ODS endpoints ------------------------------------------------------ def download_bom_ods(self, part_number: str) -> bytes: return self._download( f"/items/{urllib.parse.quote(part_number, safe='')}/bom/export.ods" ) def download_project_sheet(self, project_code: str) -> bytes: return self._download( f"/projects/{urllib.parse.quote(project_code, safe='')}/sheet.ods" ) def upload_sheet_diff( self, ods_bytes: bytes, filename: str = "sheet.ods" ) -> Dict[str, Any]: return self._upload_ods("/sheets/diff", ods_bytes, filename) # -- Part number generation --------------------------------------------- def generate_part_number(self, schema: str, category: str) -> Dict[str, Any]: return self._request( "POST", "/generate-part-number", {"schema": schema, "category": category}, )