Files
silo/pkg/calc/pythonpath/silo_calc/client.py
Zoe Forbes 36a8d9995d feat: LibreOffice Calc extension, ODS library, AI description, audit design
Calc extension (pkg/calc/):
- Python UNO ProtocolHandler with 8 toolbar commands
- SiloClient HTTP client adapted from FreeCAD workbench
- Pull BOM/Project: populates sheets with 28-col format, hidden property
  columns, row hash tracking, auto project tagging
- Push: row classification, create/update items, conflict detection
- Completion wizard: 3-step category/description/fields with PN conflict
  resolution dialog
- OpenRouter AI integration: generate standardized descriptions from seller
  text, configurable model/instructions, review dialog
- Settings: JSON persistence, env var fallbacks, OpenRouter fields
- 31 unit tests (no UNO/network required)

Go ODS library (internal/ods/):
- Pure Go ODS read/write (ZIP of XML, no headless LibreOffice)
- Writer, reader, 10 round-trip tests

Server ODS endpoints (internal/api/ods.go):
- GET /api/items/export.ods, template.ods, POST import.ods
- GET /api/items/{pn}/bom/export.ods
- GET /api/projects/{code}/sheet.ods
- POST /api/sheets/diff

Documentation:
- docs/CALC_EXTENSION.md: extension progress report
- docs/COMPONENT_AUDIT.md: web audit tool design with weighted scoring,
  assembly computed fields, batch AI assistance plan
2026-02-01 10:06:20 -06:00

448 lines
16 KiB
Python

"""Silo API client for LibreOffice Calc extension.
Adapted from pkg/freecad/silo_commands.py SiloClient. Uses urllib (no
external dependencies) and the same auth flow: session login to obtain a
persistent API token stored in a local JSON settings file.
"""
import http.cookiejar
import json
import os
import socket
import ssl
import urllib.error
import urllib.parse
import urllib.request
from typing import Any, Dict, List, Optional, Tuple
from . import settings as _settings
# ---------------------------------------------------------------------------
# SSL helpers
# ---------------------------------------------------------------------------
def _get_ssl_context() -> ssl.SSLContext:
"""Build an SSL context honouring the user's verify/cert preferences."""
cfg = _settings.load()
if not cfg.get("ssl_verify", True):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
ctx = ssl.create_default_context()
custom_cert = cfg.get("ssl_cert_path", "")
if custom_cert and os.path.isfile(custom_cert):
try:
ctx.load_verify_locations(custom_cert)
except Exception:
pass
# Load system CA bundles (bundled Python may not find them automatically)
for ca_path in (
"/etc/ssl/certs/ca-certificates.crt",
"/etc/pki/tls/certs/ca-bundle.crt",
):
if os.path.isfile(ca_path):
try:
ctx.load_verify_locations(ca_path)
except Exception:
pass
break
return ctx
# ---------------------------------------------------------------------------
# SiloClient
# ---------------------------------------------------------------------------
class SiloClient:
"""HTTP client for the Silo REST API."""
def __init__(self, base_url: str = None):
self._explicit_url = base_url
# -- URL helpers --------------------------------------------------------
@property
def base_url(self) -> str:
if self._explicit_url:
return self._explicit_url.rstrip("/")
cfg = _settings.load()
url = cfg.get("api_url", "").rstrip("/")
if not url:
url = os.environ.get("SILO_API_URL", "http://localhost:8080/api")
# Auto-append /api for bare origins
parsed = urllib.parse.urlparse(url)
if not parsed.path or parsed.path == "/":
url = url + "/api"
return url
@property
def _origin(self) -> str:
"""Server origin (without /api) for auth endpoints."""
base = self.base_url
return base.rsplit("/api", 1)[0] if base.endswith("/api") else base
# -- Auth headers -------------------------------------------------------
def _auth_headers(self) -> Dict[str, str]:
token = _settings.load().get("api_token", "") or os.environ.get(
"SILO_API_TOKEN", ""
)
if token:
return {"Authorization": f"Bearer {token}"}
return {}
# -- Core HTTP ----------------------------------------------------------
def _request(
self,
method: str,
path: str,
data: Optional[Dict] = None,
raw: bool = False,
) -> Any:
"""Make an authenticated JSON request. Returns parsed JSON.
If *raw* is True the response bytes are returned instead.
"""
url = f"{self.base_url}{path}"
headers = {"Content-Type": "application/json"}
headers.update(self._auth_headers())
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, context=_get_ssl_context()) as resp:
payload = resp.read()
if raw:
return payload
return json.loads(payload.decode())
except urllib.error.HTTPError as e:
if e.code == 401:
_settings.clear_auth()
error_body = e.read().decode()
raise RuntimeError(f"API error {e.code}: {error_body}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _download(self, path: str) -> bytes:
"""Download raw bytes from an API path."""
url = f"{self.base_url}{path}"
req = urllib.request.Request(url, headers=self._auth_headers(), method="GET")
try:
with urllib.request.urlopen(req, context=_get_ssl_context()) as resp:
return resp.read()
except urllib.error.HTTPError as e:
raise RuntimeError(f"Download error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _upload_ods(
self, path: str, ods_bytes: bytes, filename: str = "upload.ods"
) -> Any:
"""POST an ODS file as multipart/form-data."""
boundary = "----SiloCalcUpload" + str(abs(hash(filename)))[-8:]
parts = []
parts.append(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
f"Content-Type: application/vnd.oasis.opendocument.spreadsheet\r\n\r\n"
)
parts.append(ods_bytes)
parts.append(f"\r\n--{boundary}--\r\n")
body = b""
for p in parts:
body += p.encode("utf-8") if isinstance(p, str) else p
url = f"{self.base_url}{path}"
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Content-Length": str(len(body)),
}
headers.update(self._auth_headers())
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, context=_get_ssl_context()) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
# -- Authentication -----------------------------------------------------
def login(self, username: str, password: str) -> Dict[str, Any]:
"""Session login + create persistent API token (same flow as FreeCAD)."""
ctx = _get_ssl_context()
cookie_jar = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(cookie_jar),
urllib.request.HTTPSHandler(context=ctx),
)
# Step 1: POST credentials to /login
login_url = f"{self._origin}/login"
form_data = urllib.parse.urlencode(
{"username": username, "password": password}
).encode()
req = urllib.request.Request(
login_url,
data=form_data,
method="POST",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
try:
opener.open(req)
except urllib.error.HTTPError as e:
if e.code not in (302, 303):
raise RuntimeError(f"Login failed (HTTP {e.code})")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
# Step 2: Verify via /api/auth/me
me_req = urllib.request.Request(f"{self._origin}/api/auth/me", method="GET")
try:
with opener.open(me_req) as resp:
user_info = json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
if e.code == 401:
raise RuntimeError("Login failed: invalid username or password")
raise RuntimeError(f"Login verification failed (HTTP {e.code})")
# Step 3: Create API token
hostname = socket.gethostname()
token_body = json.dumps(
{"name": f"LibreOffice Calc ({hostname})", "expires_in_days": 90}
).encode()
token_req = urllib.request.Request(
f"{self._origin}/api/auth/tokens",
data=token_body,
method="POST",
headers={"Content-Type": "application/json"},
)
try:
with opener.open(token_req) as resp:
token_result = json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"Failed to create API token (HTTP {e.code})")
raw_token = token_result.get("token", "")
if not raw_token:
raise RuntimeError("Server did not return an API token")
_settings.save_auth(
username=user_info.get("username", username),
role=user_info.get("role", ""),
source=user_info.get("auth_source", ""),
token=raw_token,
)
return {
"username": user_info.get("username", username),
"role": user_info.get("role", ""),
"auth_source": user_info.get("auth_source", ""),
"token_name": token_result.get("name", ""),
}
def logout(self):
_settings.clear_auth()
def is_authenticated(self) -> bool:
cfg = _settings.load()
return bool(cfg.get("api_token") or os.environ.get("SILO_API_TOKEN"))
def get_current_user(self) -> Optional[Dict[str, Any]]:
try:
return self._request("GET", "/auth/me")
except RuntimeError:
return None
def check_connection(self) -> Tuple[bool, str]:
url = f"{self._origin}/health"
req = urllib.request.Request(url, method="GET")
try:
with urllib.request.urlopen(
req, context=_get_ssl_context(), timeout=5
) as resp:
return True, f"OK ({resp.status})"
except urllib.error.HTTPError as e:
return True, f"Server error ({e.code})"
except urllib.error.URLError as e:
return False, str(e.reason)
except Exception as e:
return False, str(e)
# -- Items --------------------------------------------------------------
def get_item(self, part_number: str) -> Dict[str, Any]:
return self._request(
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}"
)
def list_items(self, search: str = "", project: str = "", limit: int = 100) -> list:
params = [f"limit={limit}"]
if search:
params.append(f"search={urllib.parse.quote(search)}")
if project:
params.append(f"project={urllib.parse.quote(project)}")
return self._request("GET", "/items?" + "&".join(params))
def create_item(
self,
schema: str,
category: str,
description: str = "",
projects: Optional[List[str]] = None,
sourcing_type: str = "",
sourcing_link: str = "",
standard_cost: Optional[float] = None,
long_description: str = "",
) -> Dict[str, Any]:
data: Dict[str, Any] = {
"schema": schema,
"category": category,
"description": description,
}
if projects:
data["projects"] = projects
if sourcing_type:
data["sourcing_type"] = sourcing_type
if sourcing_link:
data["sourcing_link"] = sourcing_link
if standard_cost is not None:
data["standard_cost"] = standard_cost
if long_description:
data["long_description"] = long_description
return self._request("POST", "/items", data)
def update_item(self, part_number: str, **fields) -> Dict[str, Any]:
return self._request(
"PUT", f"/items/{urllib.parse.quote(part_number, safe='')}", fields
)
# -- Projects -----------------------------------------------------------
def get_projects(self) -> list:
return self._request("GET", "/projects")
def get_project_items(self, code: str) -> list:
return self._request(
"GET", f"/projects/{urllib.parse.quote(code, safe='')}/items"
)
def add_item_projects(
self, part_number: str, project_codes: List[str]
) -> Dict[str, Any]:
return self._request(
"POST",
f"/items/{urllib.parse.quote(part_number, safe='')}/projects",
{"projects": project_codes},
)
def get_item_projects(self, part_number: str) -> list:
return self._request(
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}/projects"
)
# -- Schemas ------------------------------------------------------------
def get_schema(self, name: str = "kindred-rd") -> Dict[str, Any]:
return self._request("GET", f"/schemas/{urllib.parse.quote(name, safe='')}")
def get_property_schema(self, name: str = "kindred-rd") -> Dict[str, Any]:
return self._request(
"GET", f"/schemas/{urllib.parse.quote(name, safe='')}/properties"
)
# -- BOM ----------------------------------------------------------------
def get_bom(self, part_number: str) -> list:
return self._request(
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}/bom"
)
def get_bom_expanded(self, part_number: str, depth: int = 10) -> list:
return self._request(
"GET",
f"/items/{urllib.parse.quote(part_number, safe='')}/bom/expanded?depth={depth}",
)
def add_bom_entry(
self,
parent_pn: str,
child_pn: str,
quantity: Optional[float] = None,
rel_type: str = "component",
metadata: Optional[Dict] = None,
) -> Dict[str, Any]:
data: Dict[str, Any] = {
"child_part_number": child_pn,
"rel_type": rel_type,
}
if quantity is not None:
data["quantity"] = quantity
if metadata:
data["metadata"] = metadata
return self._request(
"POST", f"/items/{urllib.parse.quote(parent_pn, safe='')}/bom", data
)
def update_bom_entry(
self,
parent_pn: str,
child_pn: str,
quantity: Optional[float] = None,
metadata: Optional[Dict] = None,
) -> Dict[str, Any]:
data: Dict[str, Any] = {}
if quantity is not None:
data["quantity"] = quantity
if metadata:
data["metadata"] = metadata
return self._request(
"PUT",
f"/items/{urllib.parse.quote(parent_pn, safe='')}/bom/{urllib.parse.quote(child_pn, safe='')}",
data,
)
# -- Revisions ----------------------------------------------------------
def get_revisions(self, part_number: str) -> list:
return self._request(
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}/revisions"
)
def get_revision(self, part_number: str, revision: int) -> Dict[str, Any]:
return self._request(
"GET",
f"/items/{urllib.parse.quote(part_number, safe='')}/revisions/{revision}",
)
# -- ODS endpoints ------------------------------------------------------
def download_bom_ods(self, part_number: str) -> bytes:
return self._download(
f"/items/{urllib.parse.quote(part_number, safe='')}/bom/export.ods"
)
def download_project_sheet(self, project_code: str) -> bytes:
return self._download(
f"/projects/{urllib.parse.quote(project_code, safe='')}/sheet.ods"
)
def upload_sheet_diff(
self, ods_bytes: bytes, filename: str = "sheet.ods"
) -> Dict[str, Any]:
return self._upload_ods("/sheets/diff", ods_bytes, filename)
# -- Part number generation ---------------------------------------------
def generate_part_number(self, schema: str, category: str) -> Dict[str, Any]:
return self._request(
"POST",
"/generate-part-number",
{"schema": schema, "category": category},
)