Calc extension (pkg/calc/):
- Python UNO ProtocolHandler with 8 toolbar commands
- SiloClient HTTP client adapted from FreeCAD workbench
- Pull BOM/Project: populates sheets with 28-col format, hidden property
columns, row hash tracking, auto project tagging
- Push: row classification, create/update items, conflict detection
- Completion wizard: 3-step category/description/fields with PN conflict
resolution dialog
- OpenRouter AI integration: generate standardized descriptions from seller
text, configurable model/instructions, review dialog
- Settings: JSON persistence, env var fallbacks, OpenRouter fields
- 31 unit tests (no UNO/network required)
Go ODS library (internal/ods/):
- Pure Go ODS read/write (ZIP of XML, no headless LibreOffice)
- Writer, reader, 10 round-trip tests
Server ODS endpoints (internal/api/ods.go):
- GET /api/items/export.ods, template.ods, POST import.ods
- GET /api/items/{pn}/bom/export.ods
- GET /api/projects/{code}/sheet.ods
- POST /api/sheets/diff
Documentation:
- docs/CALC_EXTENSION.md: extension progress report
- docs/COMPONENT_AUDIT.md: web audit tool design with weighted scoring,
assembly computed fields, batch AI assistance plan
448 lines
16 KiB
Python
448 lines
16 KiB
Python
"""Silo API client for LibreOffice Calc extension.
|
|
|
|
Adapted from pkg/freecad/silo_commands.py SiloClient. Uses urllib (no
|
|
external dependencies) and the same auth flow: session login to obtain a
|
|
persistent API token stored in a local JSON settings file.
|
|
"""
|
|
|
|
import http.cookiejar
|
|
import json
|
|
import os
|
|
import socket
|
|
import ssl
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from . import settings as _settings
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SSL helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _get_ssl_context() -> ssl.SSLContext:
|
|
"""Build an SSL context honouring the user's verify/cert preferences."""
|
|
cfg = _settings.load()
|
|
if not cfg.get("ssl_verify", True):
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
return ctx
|
|
|
|
ctx = ssl.create_default_context()
|
|
custom_cert = cfg.get("ssl_cert_path", "")
|
|
if custom_cert and os.path.isfile(custom_cert):
|
|
try:
|
|
ctx.load_verify_locations(custom_cert)
|
|
except Exception:
|
|
pass
|
|
# Load system CA bundles (bundled Python may not find them automatically)
|
|
for ca_path in (
|
|
"/etc/ssl/certs/ca-certificates.crt",
|
|
"/etc/pki/tls/certs/ca-bundle.crt",
|
|
):
|
|
if os.path.isfile(ca_path):
|
|
try:
|
|
ctx.load_verify_locations(ca_path)
|
|
except Exception:
|
|
pass
|
|
break
|
|
return ctx
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SiloClient
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class SiloClient:
|
|
"""HTTP client for the Silo REST API."""
|
|
|
|
def __init__(self, base_url: str = None):
|
|
self._explicit_url = base_url
|
|
|
|
# -- URL helpers --------------------------------------------------------
|
|
|
|
@property
|
|
def base_url(self) -> str:
|
|
if self._explicit_url:
|
|
return self._explicit_url.rstrip("/")
|
|
cfg = _settings.load()
|
|
url = cfg.get("api_url", "").rstrip("/")
|
|
if not url:
|
|
url = os.environ.get("SILO_API_URL", "http://localhost:8080/api")
|
|
# Auto-append /api for bare origins
|
|
parsed = urllib.parse.urlparse(url)
|
|
if not parsed.path or parsed.path == "/":
|
|
url = url + "/api"
|
|
return url
|
|
|
|
@property
|
|
def _origin(self) -> str:
|
|
"""Server origin (without /api) for auth endpoints."""
|
|
base = self.base_url
|
|
return base.rsplit("/api", 1)[0] if base.endswith("/api") else base
|
|
|
|
# -- Auth headers -------------------------------------------------------
|
|
|
|
def _auth_headers(self) -> Dict[str, str]:
|
|
token = _settings.load().get("api_token", "") or os.environ.get(
|
|
"SILO_API_TOKEN", ""
|
|
)
|
|
if token:
|
|
return {"Authorization": f"Bearer {token}"}
|
|
return {}
|
|
|
|
# -- Core HTTP ----------------------------------------------------------
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
data: Optional[Dict] = None,
|
|
raw: bool = False,
|
|
) -> Any:
|
|
"""Make an authenticated JSON request. Returns parsed JSON.
|
|
|
|
If *raw* is True the response bytes are returned instead.
|
|
"""
|
|
url = f"{self.base_url}{path}"
|
|
headers = {"Content-Type": "application/json"}
|
|
headers.update(self._auth_headers())
|
|
body = json.dumps(data).encode() if data else None
|
|
req = urllib.request.Request(url, data=body, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(req, context=_get_ssl_context()) as resp:
|
|
payload = resp.read()
|
|
if raw:
|
|
return payload
|
|
return json.loads(payload.decode())
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 401:
|
|
_settings.clear_auth()
|
|
error_body = e.read().decode()
|
|
raise RuntimeError(f"API error {e.code}: {error_body}")
|
|
except urllib.error.URLError as e:
|
|
raise RuntimeError(f"Connection error: {e.reason}")
|
|
|
|
def _download(self, path: str) -> bytes:
|
|
"""Download raw bytes from an API path."""
|
|
url = f"{self.base_url}{path}"
|
|
req = urllib.request.Request(url, headers=self._auth_headers(), method="GET")
|
|
try:
|
|
with urllib.request.urlopen(req, context=_get_ssl_context()) as resp:
|
|
return resp.read()
|
|
except urllib.error.HTTPError as e:
|
|
raise RuntimeError(f"Download error {e.code}: {e.read().decode()}")
|
|
except urllib.error.URLError as e:
|
|
raise RuntimeError(f"Connection error: {e.reason}")
|
|
|
|
def _upload_ods(
|
|
self, path: str, ods_bytes: bytes, filename: str = "upload.ods"
|
|
) -> Any:
|
|
"""POST an ODS file as multipart/form-data."""
|
|
boundary = "----SiloCalcUpload" + str(abs(hash(filename)))[-8:]
|
|
parts = []
|
|
parts.append(
|
|
f"--{boundary}\r\n"
|
|
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
|
|
f"Content-Type: application/vnd.oasis.opendocument.spreadsheet\r\n\r\n"
|
|
)
|
|
parts.append(ods_bytes)
|
|
parts.append(f"\r\n--{boundary}--\r\n")
|
|
|
|
body = b""
|
|
for p in parts:
|
|
body += p.encode("utf-8") if isinstance(p, str) else p
|
|
|
|
url = f"{self.base_url}{path}"
|
|
headers = {
|
|
"Content-Type": f"multipart/form-data; boundary={boundary}",
|
|
"Content-Length": str(len(body)),
|
|
}
|
|
headers.update(self._auth_headers())
|
|
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
|
|
try:
|
|
with urllib.request.urlopen(req, context=_get_ssl_context()) as resp:
|
|
return json.loads(resp.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}")
|
|
except urllib.error.URLError as e:
|
|
raise RuntimeError(f"Connection error: {e.reason}")
|
|
|
|
# -- Authentication -----------------------------------------------------
|
|
|
|
def login(self, username: str, password: str) -> Dict[str, Any]:
|
|
"""Session login + create persistent API token (same flow as FreeCAD)."""
|
|
ctx = _get_ssl_context()
|
|
cookie_jar = http.cookiejar.CookieJar()
|
|
opener = urllib.request.build_opener(
|
|
urllib.request.HTTPCookieProcessor(cookie_jar),
|
|
urllib.request.HTTPSHandler(context=ctx),
|
|
)
|
|
|
|
# Step 1: POST credentials to /login
|
|
login_url = f"{self._origin}/login"
|
|
form_data = urllib.parse.urlencode(
|
|
{"username": username, "password": password}
|
|
).encode()
|
|
req = urllib.request.Request(
|
|
login_url,
|
|
data=form_data,
|
|
method="POST",
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
)
|
|
try:
|
|
opener.open(req)
|
|
except urllib.error.HTTPError as e:
|
|
if e.code not in (302, 303):
|
|
raise RuntimeError(f"Login failed (HTTP {e.code})")
|
|
except urllib.error.URLError as e:
|
|
raise RuntimeError(f"Connection error: {e.reason}")
|
|
|
|
# Step 2: Verify via /api/auth/me
|
|
me_req = urllib.request.Request(f"{self._origin}/api/auth/me", method="GET")
|
|
try:
|
|
with opener.open(me_req) as resp:
|
|
user_info = json.loads(resp.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 401:
|
|
raise RuntimeError("Login failed: invalid username or password")
|
|
raise RuntimeError(f"Login verification failed (HTTP {e.code})")
|
|
|
|
# Step 3: Create API token
|
|
hostname = socket.gethostname()
|
|
token_body = json.dumps(
|
|
{"name": f"LibreOffice Calc ({hostname})", "expires_in_days": 90}
|
|
).encode()
|
|
token_req = urllib.request.Request(
|
|
f"{self._origin}/api/auth/tokens",
|
|
data=token_body,
|
|
method="POST",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
try:
|
|
with opener.open(token_req) as resp:
|
|
token_result = json.loads(resp.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
raise RuntimeError(f"Failed to create API token (HTTP {e.code})")
|
|
|
|
raw_token = token_result.get("token", "")
|
|
if not raw_token:
|
|
raise RuntimeError("Server did not return an API token")
|
|
|
|
_settings.save_auth(
|
|
username=user_info.get("username", username),
|
|
role=user_info.get("role", ""),
|
|
source=user_info.get("auth_source", ""),
|
|
token=raw_token,
|
|
)
|
|
return {
|
|
"username": user_info.get("username", username),
|
|
"role": user_info.get("role", ""),
|
|
"auth_source": user_info.get("auth_source", ""),
|
|
"token_name": token_result.get("name", ""),
|
|
}
|
|
|
|
def logout(self):
|
|
_settings.clear_auth()
|
|
|
|
def is_authenticated(self) -> bool:
|
|
cfg = _settings.load()
|
|
return bool(cfg.get("api_token") or os.environ.get("SILO_API_TOKEN"))
|
|
|
|
def get_current_user(self) -> Optional[Dict[str, Any]]:
|
|
try:
|
|
return self._request("GET", "/auth/me")
|
|
except RuntimeError:
|
|
return None
|
|
|
|
def check_connection(self) -> Tuple[bool, str]:
|
|
url = f"{self._origin}/health"
|
|
req = urllib.request.Request(url, method="GET")
|
|
try:
|
|
with urllib.request.urlopen(
|
|
req, context=_get_ssl_context(), timeout=5
|
|
) as resp:
|
|
return True, f"OK ({resp.status})"
|
|
except urllib.error.HTTPError as e:
|
|
return True, f"Server error ({e.code})"
|
|
except urllib.error.URLError as e:
|
|
return False, str(e.reason)
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
# -- Items --------------------------------------------------------------
|
|
|
|
def get_item(self, part_number: str) -> Dict[str, Any]:
|
|
return self._request(
|
|
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}"
|
|
)
|
|
|
|
def list_items(self, search: str = "", project: str = "", limit: int = 100) -> list:
|
|
params = [f"limit={limit}"]
|
|
if search:
|
|
params.append(f"search={urllib.parse.quote(search)}")
|
|
if project:
|
|
params.append(f"project={urllib.parse.quote(project)}")
|
|
return self._request("GET", "/items?" + "&".join(params))
|
|
|
|
def create_item(
|
|
self,
|
|
schema: str,
|
|
category: str,
|
|
description: str = "",
|
|
projects: Optional[List[str]] = None,
|
|
sourcing_type: str = "",
|
|
sourcing_link: str = "",
|
|
standard_cost: Optional[float] = None,
|
|
long_description: str = "",
|
|
) -> Dict[str, Any]:
|
|
data: Dict[str, Any] = {
|
|
"schema": schema,
|
|
"category": category,
|
|
"description": description,
|
|
}
|
|
if projects:
|
|
data["projects"] = projects
|
|
if sourcing_type:
|
|
data["sourcing_type"] = sourcing_type
|
|
if sourcing_link:
|
|
data["sourcing_link"] = sourcing_link
|
|
if standard_cost is not None:
|
|
data["standard_cost"] = standard_cost
|
|
if long_description:
|
|
data["long_description"] = long_description
|
|
return self._request("POST", "/items", data)
|
|
|
|
def update_item(self, part_number: str, **fields) -> Dict[str, Any]:
|
|
return self._request(
|
|
"PUT", f"/items/{urllib.parse.quote(part_number, safe='')}", fields
|
|
)
|
|
|
|
# -- Projects -----------------------------------------------------------
|
|
|
|
def get_projects(self) -> list:
|
|
return self._request("GET", "/projects")
|
|
|
|
def get_project_items(self, code: str) -> list:
|
|
return self._request(
|
|
"GET", f"/projects/{urllib.parse.quote(code, safe='')}/items"
|
|
)
|
|
|
|
def add_item_projects(
|
|
self, part_number: str, project_codes: List[str]
|
|
) -> Dict[str, Any]:
|
|
return self._request(
|
|
"POST",
|
|
f"/items/{urllib.parse.quote(part_number, safe='')}/projects",
|
|
{"projects": project_codes},
|
|
)
|
|
|
|
def get_item_projects(self, part_number: str) -> list:
|
|
return self._request(
|
|
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}/projects"
|
|
)
|
|
|
|
# -- Schemas ------------------------------------------------------------
|
|
|
|
def get_schema(self, name: str = "kindred-rd") -> Dict[str, Any]:
|
|
return self._request("GET", f"/schemas/{urllib.parse.quote(name, safe='')}")
|
|
|
|
def get_property_schema(self, name: str = "kindred-rd") -> Dict[str, Any]:
|
|
return self._request(
|
|
"GET", f"/schemas/{urllib.parse.quote(name, safe='')}/properties"
|
|
)
|
|
|
|
# -- BOM ----------------------------------------------------------------
|
|
|
|
def get_bom(self, part_number: str) -> list:
|
|
return self._request(
|
|
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}/bom"
|
|
)
|
|
|
|
def get_bom_expanded(self, part_number: str, depth: int = 10) -> list:
|
|
return self._request(
|
|
"GET",
|
|
f"/items/{urllib.parse.quote(part_number, safe='')}/bom/expanded?depth={depth}",
|
|
)
|
|
|
|
def add_bom_entry(
|
|
self,
|
|
parent_pn: str,
|
|
child_pn: str,
|
|
quantity: Optional[float] = None,
|
|
rel_type: str = "component",
|
|
metadata: Optional[Dict] = None,
|
|
) -> Dict[str, Any]:
|
|
data: Dict[str, Any] = {
|
|
"child_part_number": child_pn,
|
|
"rel_type": rel_type,
|
|
}
|
|
if quantity is not None:
|
|
data["quantity"] = quantity
|
|
if metadata:
|
|
data["metadata"] = metadata
|
|
return self._request(
|
|
"POST", f"/items/{urllib.parse.quote(parent_pn, safe='')}/bom", data
|
|
)
|
|
|
|
def update_bom_entry(
|
|
self,
|
|
parent_pn: str,
|
|
child_pn: str,
|
|
quantity: Optional[float] = None,
|
|
metadata: Optional[Dict] = None,
|
|
) -> Dict[str, Any]:
|
|
data: Dict[str, Any] = {}
|
|
if quantity is not None:
|
|
data["quantity"] = quantity
|
|
if metadata:
|
|
data["metadata"] = metadata
|
|
return self._request(
|
|
"PUT",
|
|
f"/items/{urllib.parse.quote(parent_pn, safe='')}/bom/{urllib.parse.quote(child_pn, safe='')}",
|
|
data,
|
|
)
|
|
|
|
# -- Revisions ----------------------------------------------------------
|
|
|
|
def get_revisions(self, part_number: str) -> list:
|
|
return self._request(
|
|
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}/revisions"
|
|
)
|
|
|
|
def get_revision(self, part_number: str, revision: int) -> Dict[str, Any]:
|
|
return self._request(
|
|
"GET",
|
|
f"/items/{urllib.parse.quote(part_number, safe='')}/revisions/{revision}",
|
|
)
|
|
|
|
# -- ODS endpoints ------------------------------------------------------
|
|
|
|
def download_bom_ods(self, part_number: str) -> bytes:
|
|
return self._download(
|
|
f"/items/{urllib.parse.quote(part_number, safe='')}/bom/export.ods"
|
|
)
|
|
|
|
def download_project_sheet(self, project_code: str) -> bytes:
|
|
return self._download(
|
|
f"/projects/{urllib.parse.quote(project_code, safe='')}/sheet.ods"
|
|
)
|
|
|
|
def upload_sheet_diff(
|
|
self, ods_bytes: bytes, filename: str = "sheet.ods"
|
|
) -> Dict[str, Any]:
|
|
return self._upload_ods("/sheets/diff", ods_bytes, filename)
|
|
|
|
# -- Part number generation ---------------------------------------------
|
|
|
|
def generate_part_number(self, schema: str, category: str) -> Dict[str, Any]:
|
|
return self._request(
|
|
"POST",
|
|
"/generate-part-number",
|
|
{"schema": schema, "category": category},
|
|
)
|