Files
silo-client/silo_client/__init__.py
forbes-0023 9d07de1bca feat(client): add merge_bom_json method for assembly BOM merge
Forward-looking JSON merge endpoint for POST /api/items/{pn}/bom/merge.
Not called by Phase 1 (which uses individual CRUD calls). Ready for
Phase 2 when the server endpoint ships.

Refs: #276
2026-02-19 12:37:04 -06:00

1259 lines
47 KiB
Python

"""Silo API client -- shared HTTP client for Silo REST API.
This package provides ``SiloClient``, a pure-Python HTTP client for the
Silo parts-database server. It uses only ``urllib`` (no external
dependencies) so it can be embedded in FreeCAD workbenches, LibreOffice
extensions, and CLI tools without any packaging overhead.
Consumers supply a ``SiloSettings`` adapter that tells the client where
to find credentials and how to persist authentication state.
"""
import http.cookiejar
import json
import os
import socket
import ssl
import urllib.error
import urllib.parse
import urllib.request
from typing import Any, Dict, List, Optional, Tuple
from ._ssl import build_ssl_context
__version__ = "0.3.0"
# ---------------------------------------------------------------------------
# Settings protocol
# ---------------------------------------------------------------------------
class SiloSettings:
"""Abstract settings interface.
Consumers must subclass this and implement all methods to bridge the
client to their application's configuration storage (FreeCAD
preferences, JSON file, environment variables, etc.).
"""
def get_api_url(self) -> str:
"""Return the Silo API base URL (e.g. ``http://localhost:8080/api``)."""
raise NotImplementedError
def get_api_token(self) -> str:
"""Return the stored API bearer token, or ``""``."""
raise NotImplementedError
def get_ssl_verify(self) -> bool:
"""Return whether to verify server TLS certificates."""
raise NotImplementedError
def get_ssl_cert_path(self) -> str:
"""Return path to a custom CA certificate file, or ``""``."""
raise NotImplementedError
def save_auth(
self, username: str, role: str = "", source: str = "", token: str = ""
):
"""Persist authentication info after a successful login."""
raise NotImplementedError
def clear_auth(self):
"""Remove all stored authentication credentials."""
raise NotImplementedError
def get_schema_name(self) -> str:
"""Return the active part-numbering schema name (default ``"kindred-rd"``)."""
return "kindred-rd"
# ---------------------------------------------------------------------------
# SiloClient
# ---------------------------------------------------------------------------
class SiloClient:
"""HTTP client for the Silo REST API.
Args:
settings: A :class:`SiloSettings` adapter for configuration storage.
base_url: Optional explicit base URL (overrides ``settings``).
"""
def __init__(self, settings: SiloSettings, base_url: str = None):
self._settings = settings
self._explicit_url = base_url
# -- URL helpers --------------------------------------------------------
@property
def base_url(self) -> str:
if self._explicit_url:
return self._explicit_url.rstrip("/")
url = self._settings.get_api_url().rstrip("/")
if not url:
url = os.environ.get("SILO_API_URL", "http://localhost:8080/api")
# Auto-append /api for bare origins
parsed = urllib.parse.urlparse(url)
if not parsed.path or parsed.path == "/":
url = url + "/api"
return url
@property
def _origin(self) -> str:
"""Server origin (without ``/api``) for auth endpoints."""
base = self.base_url
return base.rsplit("/api", 1)[0] if base.endswith("/api") else base
def _ssl_context(self) -> ssl.SSLContext:
return build_ssl_context(
self._settings.get_ssl_verify(),
self._settings.get_ssl_cert_path(),
)
# -- Auth headers -------------------------------------------------------
def _auth_headers(self) -> Dict[str, str]:
token = self._settings.get_api_token()
if not token:
token = os.environ.get("SILO_API_TOKEN", "")
if token:
return {"Authorization": f"Bearer {token}"}
return {}
# -- Core HTTP ----------------------------------------------------------
def _request(
self,
method: str,
path: str,
data: Optional[Dict] = None,
raw: bool = False,
) -> Any:
"""Make an authenticated JSON request.
If *raw* is True the response bytes are returned instead.
"""
url = f"{self.base_url}{path}"
headers = {"Content-Type": "application/json"}
headers.update(self._auth_headers())
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
payload = resp.read()
if raw:
return payload
return json.loads(payload.decode())
except urllib.error.HTTPError as e:
if e.code == 401:
self._settings.clear_auth()
error_body = e.read().decode()
raise RuntimeError(f"API error {e.code}: {error_body}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _download(self, path: str) -> bytes:
"""Download raw bytes from an API path."""
url = f"{self.base_url}{path}"
req = urllib.request.Request(url, headers=self._auth_headers(), method="GET")
try:
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
return resp.read()
except urllib.error.HTTPError as e:
raise RuntimeError(f"Download error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _download_file(
self,
part_number: str,
revision: int,
dest_path: str,
progress_callback=None,
) -> bool:
"""Download a revision file from the server.
Args:
progress_callback: Optional ``callable(bytes_downloaded, total_bytes)``.
*total_bytes* is ``-1`` when the server omits Content-Length.
"""
url = f"{self.base_url}/items/{part_number}/file/{revision}"
req = urllib.request.Request(url, headers=self._auth_headers(), method="GET")
try:
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
total = int(resp.headers.get("Content-Length", -1))
downloaded = 0
with open(dest_path, "wb") as f:
while True:
chunk = resp.read(8192)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if progress_callback is not None:
progress_callback(downloaded, total)
return True
except urllib.error.HTTPError as e:
if e.code == 404:
return False
raise RuntimeError(f"Download error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _upload_file(
self, part_number: str, file_path: str, properties: Dict, comment: str = ""
) -> Dict[str, Any]:
"""Upload a file and create a new revision."""
import mimetypes
url = f"{self.base_url}/items/{part_number}/file"
with open(file_path, "rb") as f:
file_data = f.read()
boundary = "----SiloUploadBoundary" + str(hash(file_path))[-8:]
body_parts: list = []
filename = os.path.basename(file_path)
content_type = mimetypes.guess_type(filename)[0] or "application/octet-stream"
body_parts.append(
f'--{boundary}\r\nContent-Disposition: form-data; name="file"; '
f'filename="{filename}"\r\nContent-Type: {content_type}\r\n\r\n'
)
body_parts.append(file_data)
body_parts.append(b"\r\n")
if comment:
body_parts.append(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="comment"\r\n\r\n'
f"{comment}\r\n"
)
if properties:
props_json = json.dumps(properties, allow_nan=False, default=str)
body_parts.append(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="properties"\r\n\r\n'
f"{props_json}\r\n"
)
body_parts.append(f"--{boundary}--\r\n")
body = b""
for part in body_parts:
body += part.encode("utf-8") if isinstance(part, str) else part
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Content-Length": str(len(body)),
}
headers.update(self._auth_headers())
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _upload_ods(
self, path: str, ods_bytes: bytes, filename: str = "upload.ods"
) -> Any:
"""POST an ODS file as multipart/form-data."""
boundary = "----SiloCalcUpload" + str(abs(hash(filename)))[-8:]
parts: list = []
parts.append(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
f"Content-Type: application/vnd.oasis.opendocument.spreadsheet\r\n\r\n"
)
parts.append(ods_bytes)
parts.append(f"\r\n--{boundary}--\r\n")
body = b""
for p in parts:
body += p.encode("utf-8") if isinstance(p, str) else p
url = f"{self.base_url}{path}"
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Content-Length": str(len(body)),
}
headers.update(self._auth_headers())
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _upload_csv(
self, path: str, csv_bytes: bytes, filename: str = "import.csv"
) -> Any:
"""POST a CSV file as multipart/form-data."""
boundary = "----SiloCsvUpload" + str(abs(hash(filename)))[-8:]
parts: list = []
parts.append(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
f"Content-Type: text/csv\r\n\r\n"
)
parts.append(csv_bytes)
parts.append(f"\r\n--{boundary}--\r\n")
body = b""
for p in parts:
body += p.encode("utf-8") if isinstance(p, str) else p
url = f"{self.base_url}{path}"
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Content-Length": str(len(body)),
}
headers.update(self._auth_headers())
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
# -- Authentication -----------------------------------------------------
def login(
self, username: str, password: str, token_name: str = ""
) -> Dict[str, Any]:
"""Session login and create a persistent API token.
Performs: ``POST /login`` (session) -> ``GET /api/auth/me`` (verify)
-> ``POST /api/auth/tokens`` (create token). The token is persisted
via ``settings.save_auth()``.
Args:
token_name: Human-readable label for the token. Defaults to
the local hostname.
"""
ctx = self._ssl_context()
cookie_jar = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(cookie_jar),
urllib.request.HTTPSHandler(context=ctx),
)
# Step 1: POST credentials to /login
login_url = f"{self._origin}/login"
form_data = urllib.parse.urlencode(
{"username": username, "password": password}
).encode()
req = urllib.request.Request(
login_url,
data=form_data,
method="POST",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
try:
opener.open(req)
except urllib.error.HTTPError as e:
if e.code not in (302, 303):
raise RuntimeError(f"Login failed (HTTP {e.code})")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
# Step 2: Verify session via /api/auth/me
me_req = urllib.request.Request(f"{self._origin}/api/auth/me", method="GET")
try:
with opener.open(me_req) as resp:
user_info = json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
if e.code == 401:
raise RuntimeError("Login failed: invalid username or password")
raise RuntimeError(f"Login verification failed (HTTP {e.code})")
# Step 3: Create a persistent API token
if not token_name:
token_name = socket.gethostname()
token_body = json.dumps({"name": token_name, "expires_in_days": 90}).encode()
token_req = urllib.request.Request(
f"{self._origin}/api/auth/tokens",
data=token_body,
method="POST",
headers={"Content-Type": "application/json"},
)
try:
with opener.open(token_req) as resp:
token_result = json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"Failed to create API token (HTTP {e.code})")
raw_token = token_result.get("token", "")
if not raw_token:
raise RuntimeError("Server did not return an API token")
self._settings.save_auth(
username=user_info.get("username", username),
role=user_info.get("role", ""),
source=user_info.get("auth_source", ""),
token=raw_token,
)
return {
"username": user_info.get("username", username),
"role": user_info.get("role", ""),
"auth_source": user_info.get("auth_source", ""),
"token_name": token_result.get("name", ""),
"token_prefix": token_result.get("token_prefix", ""),
}
def logout(self):
"""Clear stored API token and authentication info."""
self._settings.clear_auth()
def is_authenticated(self) -> bool:
"""Return True if a valid API token is configured."""
return bool(self._settings.get_api_token() or os.environ.get("SILO_API_TOKEN"))
def get_current_user(self) -> Optional[Dict[str, Any]]:
"""Fetch the current user info from the server."""
try:
return self._request("GET", "/auth/me")
except RuntimeError:
return None
def list_tokens(self) -> List[Dict[str, Any]]:
"""List API tokens for the current user."""
return self._request("GET", "/auth/tokens")
def create_token(
self, name: str, expires_in_days: Optional[int] = None
) -> Dict[str, Any]:
"""Create a new API token."""
data: Dict[str, Any] = {"name": name}
if expires_in_days is not None:
data["expires_in_days"] = expires_in_days
return self._request("POST", "/auth/tokens", data)
def revoke_token(self, token_id: str) -> None:
"""Revoke an API token by its ID."""
url = f"{self.base_url}/auth/tokens/{token_id}"
headers = {"Content-Type": "application/json"}
headers.update(self._auth_headers())
req = urllib.request.Request(url, headers=headers, method="DELETE")
try:
urllib.request.urlopen(req, context=self._ssl_context())
except urllib.error.HTTPError as e:
raise RuntimeError(f"API error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def check_connection(self) -> Tuple[bool, str]:
"""Check connectivity to the Silo server.
Returns ``(reachable, message)``.
"""
url = f"{self._origin}/health"
req = urllib.request.Request(url, method="GET")
try:
with urllib.request.urlopen(
req, context=self._ssl_context(), timeout=5
) as resp:
return True, f"OK ({resp.status})"
except urllib.error.HTTPError as e:
return True, f"Server error ({e.code})"
except urllib.error.URLError as e:
return False, str(e.reason)
except Exception as e:
return False, str(e)
# -- Items --------------------------------------------------------------
def get_item(self, part_number: str) -> Dict[str, Any]:
return self._request(
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}"
)
def list_items(
self,
search: str = "",
item_type: str = "",
project: str = "",
limit: int = 100,
) -> list:
params = [f"limit={limit}"]
if search:
params.append(f"search={urllib.parse.quote(search)}")
if item_type:
params.append(f"type={item_type}")
if project:
params.append(f"project={urllib.parse.quote(project)}")
return self._request("GET", "/items?" + "&".join(params))
def create_item(
self,
schema: str,
category: str,
description: str = "",
projects: Optional[List[str]] = None,
sourcing_type: str = "",
sourcing_link: str = "",
standard_cost: Optional[float] = None,
long_description: str = "",
) -> Dict[str, Any]:
data: Dict[str, Any] = {
"schema": schema,
"category": category,
"description": description,
}
if projects:
data["projects"] = projects
if sourcing_type:
data["sourcing_type"] = sourcing_type
if sourcing_link:
data["sourcing_link"] = sourcing_link
if standard_cost is not None:
data["standard_cost"] = standard_cost
if long_description:
data["long_description"] = long_description
return self._request("POST", "/items", data)
def update_item(self, part_number: str, **fields) -> Dict[str, Any]:
return self._request(
"PUT",
f"/items/{urllib.parse.quote(part_number, safe='')}",
fields,
)
def search_items(self, query: str, limit: int = 100, offset: int = 0) -> list:
"""Full-text search across items."""
q = urllib.parse.quote(query)
return self._request(
"GET", f"/items/search?q={q}&limit={limit}&offset={offset}"
)
def get_item_by_uuid(self, uuid: str) -> Dict[str, Any]:
"""Get an item by its internal UUID."""
return self._request(
"GET", f"/items/by-uuid/{urllib.parse.quote(uuid, safe='')}"
)
def delete_item(self, part_number: str) -> None:
"""Delete an item by part number."""
pn = urllib.parse.quote(part_number, safe="")
self._request("DELETE", f"/items/{pn}", raw=True)
def export_items_csv(self, project: str = "", category: str = "") -> bytes:
"""Export items as CSV."""
params: list = []
if project:
params.append(f"project={urllib.parse.quote(project)}")
if category:
params.append(f"category={urllib.parse.quote(category)}")
path = "/items/export.csv"
if params:
path += "?" + "&".join(params)
return self._download(path)
def download_items_csv_template(self) -> bytes:
"""Download the CSV import template for items."""
return self._download("/items/template.csv")
def export_items_ods(self, project: str = "", category: str = "") -> bytes:
"""Export items as ODS spreadsheet."""
params: list = []
if project:
params.append(f"project={urllib.parse.quote(project)}")
if category:
params.append(f"category={urllib.parse.quote(category)}")
path = "/items/export.ods"
if params:
path += "?" + "&".join(params)
return self._download(path)
def download_items_ods_template(self) -> bytes:
"""Download the ODS import template for items."""
return self._download("/items/template.ods")
def import_items_csv(self, csv_bytes: bytes, filename: str = "import.csv") -> Any:
"""Import items from a CSV file."""
return self._upload_csv("/items/import", csv_bytes, filename)
def import_items_ods(self, ods_bytes: bytes, filename: str = "import.ods") -> Any:
"""Import items from an ODS spreadsheet."""
return self._upload_ods("/items/import.ods", ods_bytes, filename)
# -- Revisions ----------------------------------------------------------
def get_revisions(self, part_number: str) -> list:
return self._request(
"GET",
f"/items/{urllib.parse.quote(part_number, safe='')}/revisions",
)
def get_revision(self, part_number: str, revision: int) -> Dict[str, Any]:
return self._request(
"GET",
f"/items/{urllib.parse.quote(part_number, safe='')}/revisions/{revision}",
)
def compare_revisions(
self, part_number: str, from_rev: int, to_rev: int
) -> Dict[str, Any]:
pn = urllib.parse.quote(part_number, safe="")
return self._request(
"GET",
f"/items/{pn}/revisions/compare?from={from_rev}&to={to_rev}",
)
def rollback_revision(
self, part_number: str, revision: int, comment: str = ""
) -> Dict[str, Any]:
data: Dict[str, Any] = {}
if comment:
data["comment"] = comment
pn = urllib.parse.quote(part_number, safe="")
return self._request("POST", f"/items/{pn}/revisions/{revision}/rollback", data)
def update_revision(
self,
part_number: str,
revision: int,
status: str = None,
labels: list = None,
) -> Dict[str, Any]:
data: Dict[str, Any] = {}
if status:
data["status"] = status
if labels is not None:
data["labels"] = labels
pn = urllib.parse.quote(part_number, safe="")
return self._request("PATCH", f"/items/{pn}/revisions/{revision}", data)
def create_revision(
self,
part_number: str,
properties: Optional[Dict] = None,
comment: str = "",
) -> Dict[str, Any]:
"""Create a new revision for an item."""
data: Dict[str, Any] = {}
if properties:
data["properties"] = properties
if comment:
data["comment"] = comment
pn = urllib.parse.quote(part_number, safe="")
return self._request("POST", f"/items/{pn}/revisions", data)
def has_file(self, part_number: str) -> Tuple[bool, Optional[int]]:
"""Check if any revision of this item has an associated file."""
try:
revisions = self.get_revisions(part_number)
for rev in revisions:
if rev.get("file_key"):
return True, rev["revision_number"]
return False, None
except Exception:
return False, None
def latest_file_revision(self, part_number: str) -> Optional[Dict]:
"""Return the most recent revision with a file, or ``None``."""
try:
revisions = self.get_revisions(part_number)
for rev in revisions:
if rev.get("file_key"):
return rev
return None
except Exception:
return None
# -- .kc Metadata -------------------------------------------------------
def get_metadata(self, part_number: str) -> Dict[str, Any]:
"""Get indexed .kc metadata for an item."""
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/metadata")
def update_metadata(
self, part_number: str, fields: Dict[str, Any]
) -> Dict[str, Any]:
"""Update .kc metadata fields."""
pn = urllib.parse.quote(part_number, safe="")
return self._request("PUT", f"/items/{pn}/metadata", fields)
def patch_lifecycle(self, part_number: str, state: str) -> Dict[str, Any]:
"""Transition lifecycle state."""
pn = urllib.parse.quote(part_number, safe="")
return self._request(
"PATCH", f"/items/{pn}/metadata/lifecycle", {"state": state}
)
def patch_tags(self, part_number: str, tags: list) -> Dict[str, Any]:
"""Add/remove tags on .kc metadata."""
pn = urllib.parse.quote(part_number, safe="")
return self._request("PATCH", f"/items/{pn}/metadata/tags", {"tags": tags})
# -- .kc Dependencies ---------------------------------------------------
def resolve_dependencies(self, part_number: str) -> list:
"""Resolve dependency UUIDs to part numbers and file availability."""
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/dependencies/resolve")
# -- BOM / Relationships ------------------------------------------------
def get_bom(self, part_number: str) -> list:
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/bom")
def get_bom_expanded(self, part_number: str, depth: int = 10) -> list:
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/bom/expanded?depth={depth}")
def get_bom_where_used(self, part_number: str) -> list:
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/bom/where-used")
def add_bom_entry(
self,
parent_pn: str,
child_pn: str,
quantity: Optional[float] = None,
unit: str = None,
rel_type: str = "component",
ref_des: list = None,
metadata: Optional[Dict] = None,
) -> Dict[str, Any]:
data: Dict[str, Any] = {
"child_part_number": child_pn,
"rel_type": rel_type,
}
if quantity is not None:
data["quantity"] = quantity
if unit:
data["unit"] = unit
if ref_des:
data["reference_designators"] = ref_des
if metadata:
data["metadata"] = metadata
pn = urllib.parse.quote(parent_pn, safe="")
return self._request("POST", f"/items/{pn}/bom", data)
def update_bom_entry(
self,
parent_pn: str,
child_pn: str,
quantity: Optional[float] = None,
unit: str = None,
rel_type: str = None,
ref_des: list = None,
metadata: Optional[Dict] = None,
) -> Dict[str, Any]:
data: Dict[str, Any] = {}
if quantity is not None:
data["quantity"] = quantity
if unit is not None:
data["unit"] = unit
if rel_type is not None:
data["rel_type"] = rel_type
if ref_des is not None:
data["reference_designators"] = ref_des
if metadata:
data["metadata"] = metadata
ppn = urllib.parse.quote(parent_pn, safe="")
cpn = urllib.parse.quote(child_pn, safe="")
return self._request("PUT", f"/items/{ppn}/bom/{cpn}", data)
def delete_bom_entry(self, parent_pn: str, child_pn: str) -> None:
ppn = urllib.parse.quote(parent_pn, safe="")
cpn = urllib.parse.quote(child_pn, safe="")
self._request("DELETE", f"/items/{ppn}/bom/{cpn}", raw=True)
def get_bom_flat(self, part_number: str) -> list:
"""Get a flattened BOM for an item."""
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/bom/flat")
def get_bom_cost(self, part_number: str) -> Dict[str, Any]:
"""Get cost rollup for an item's BOM."""
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/bom/cost")
def export_bom_csv(self, part_number: str) -> bytes:
"""Export a BOM as CSV."""
pn = urllib.parse.quote(part_number, safe="")
return self._download(f"/items/{pn}/bom/export.csv")
def import_bom_csv(
self, part_number: str, csv_bytes: bytes, filename: str = "bom.csv"
) -> Any:
"""Import a BOM from a CSV file."""
pn = urllib.parse.quote(part_number, safe="")
return self._upload_csv(f"/items/{pn}/bom/import", csv_bytes, filename)
def merge_bom(
self, part_number: str, ods_bytes: bytes, filename: str = "bom.ods"
) -> Any:
"""Merge a BOM from an ODS spreadsheet."""
pn = urllib.parse.quote(part_number, safe="")
return self._upload_ods(f"/items/{pn}/bom/merge", ods_bytes, filename)
def merge_bom_json(self, part_number: str, entries: list) -> Dict[str, Any]:
"""Submit a JSON BOM merge from assembly extraction.
Calls ``POST /api/items/{partNumber}/bom/merge`` with a JSON body
containing assembly-derived BOM entries. The server applies adds
and quantity changes, flags removed items as unreferenced, and
returns the diff.
Not used by Phase 1 (which calls add/update individually).
Ready for Phase 2 when the server endpoint ships.
"""
pn = urllib.parse.quote(part_number, safe="")
return self._request(
"POST",
f"/items/{pn}/bom/merge",
{"source": "assembly", "entries": entries},
)
# -- Schemas ------------------------------------------------------------
def list_schemas(self) -> list:
"""List all available schemas."""
return self._request("GET", "/schemas")
def get_schema(self, name: str = "") -> Dict[str, Any]:
if not name:
name = self._settings.get_schema_name()
return self._request("GET", f"/schemas/{urllib.parse.quote(name, safe='')}")
def get_schema_form(self, name: str = "") -> Dict[str, Any]:
"""Get form descriptor for a schema (field groups, widgets, category picker)."""
if not name:
name = self._settings.get_schema_name()
return self._request(
"GET",
f"/schemas/{urllib.parse.quote(name, safe='')}/form",
)
def get_property_schema(self, name: str = "", category: str = "") -> Dict[str, Any]:
if not name:
name = self._settings.get_schema_name()
path = f"/schemas/{urllib.parse.quote(name, safe='')}/properties"
if category:
path += f"?category={urllib.parse.quote(category)}"
return self._request("GET", path)
def add_enum_value(
self, schema: str, segment: str, code: str, label: str = ""
) -> Dict[str, Any]:
"""Add a value to an enum segment in a schema."""
s = urllib.parse.quote(schema, safe="")
seg = urllib.parse.quote(segment, safe="")
data: Dict[str, Any] = {"code": code}
if label:
data["label"] = label
return self._request("POST", f"/schemas/{s}/segments/{seg}/values", data)
def update_enum_value(
self, schema: str, segment: str, code: str, **fields
) -> Dict[str, Any]:
"""Update an enum value in a schema segment."""
s = urllib.parse.quote(schema, safe="")
seg = urllib.parse.quote(segment, safe="")
c = urllib.parse.quote(code, safe="")
return self._request("PUT", f"/schemas/{s}/segments/{seg}/values/{c}", fields)
def delete_enum_value(self, schema: str, segment: str, code: str) -> None:
"""Delete an enum value from a schema segment."""
s = urllib.parse.quote(schema, safe="")
seg = urllib.parse.quote(segment, safe="")
c = urllib.parse.quote(code, safe="")
self._request("DELETE", f"/schemas/{s}/segments/{seg}/values/{c}", raw=True)
# -- Projects -----------------------------------------------------------
def get_projects(self) -> list:
return self._request("GET", "/projects")
def get_project(self, code: str) -> Dict[str, Any]:
"""Get a single project by code."""
return self._request("GET", f"/projects/{urllib.parse.quote(code, safe='')}")
def create_project(
self, code: str, name: str, description: str = ""
) -> Dict[str, Any]:
"""Create a new project."""
data: Dict[str, Any] = {"code": code, "name": name}
if description:
data["description"] = description
return self._request("POST", "/projects", data)
def get_project_items(self, code: str) -> list:
return self._request(
"GET", f"/projects/{urllib.parse.quote(code, safe='')}/items"
)
def get_item_projects(self, part_number: str) -> list:
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/projects")
def add_item_projects(
self, part_number: str, project_codes: List[str]
) -> Dict[str, Any]:
pn = urllib.parse.quote(part_number, safe="")
return self._request(
"POST", f"/items/{pn}/projects", {"projects": project_codes}
)
def update_project(self, code: str, **fields) -> Dict[str, Any]:
"""Update a project's fields."""
return self._request(
"PUT",
f"/projects/{urllib.parse.quote(code, safe='')}",
fields,
)
def delete_project(self, code: str) -> None:
"""Delete a project."""
self._request(
"DELETE", f"/projects/{urllib.parse.quote(code, safe='')}", raw=True
)
def remove_item_project(self, part_number: str, project_code: str) -> None:
"""Remove a project tag from an item."""
pn = urllib.parse.quote(part_number, safe="")
code = urllib.parse.quote(project_code, safe="")
self._request("DELETE", f"/items/{pn}/projects/{code}", raw=True)
# -- Part number generation ---------------------------------------------
def generate_part_number(self, schema: str, category: str) -> Dict[str, Any]:
return self._request(
"POST",
"/generate-part-number",
{"schema": schema, "category": category},
)
# -- ODS endpoints ------------------------------------------------------
def download_bom_ods(self, part_number: str) -> bytes:
pn = urllib.parse.quote(part_number, safe="")
return self._download(f"/items/{pn}/bom/export.ods")
def download_project_sheet(self, project_code: str) -> bytes:
code = urllib.parse.quote(project_code, safe="")
return self._download(f"/projects/{code}/sheet.ods")
def upload_sheet_diff(
self, ods_bytes: bytes, filename: str = "sheet.ods"
) -> Dict[str, Any]:
return self._upload_ods("/sheets/diff", ods_bytes, filename)
# -- Files / Attachments ------------------------------------------------
def presign_upload(self, filename: str, content_type: str = "") -> Dict[str, Any]:
"""Get a pre-signed upload URL for a file."""
data: Dict[str, Any] = {"filename": filename}
if content_type:
data["content_type"] = content_type
return self._request("POST", "/uploads/presign", data)
def list_attachments(self, part_number: str) -> list:
"""List file attachments on an item."""
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/files")
def add_attachment(
self,
part_number: str,
upload_key: str,
filename: str,
content_type: str = "",
description: str = "",
) -> Dict[str, Any]:
"""Attach an uploaded file to an item."""
pn = urllib.parse.quote(part_number, safe="")
data: Dict[str, Any] = {"upload_key": upload_key, "filename": filename}
if content_type:
data["content_type"] = content_type
if description:
data["description"] = description
return self._request("POST", f"/items/{pn}/files", data)
def delete_attachment(self, part_number: str, file_id: str) -> None:
"""Delete a file attachment from an item."""
pn = urllib.parse.quote(part_number, safe="")
fid = urllib.parse.quote(file_id, safe="")
self._request("DELETE", f"/items/{pn}/files/{fid}", raw=True)
def set_thumbnail(
self,
part_number: str,
image_bytes: bytes,
content_type: str = "image/png",
) -> None:
"""Set the thumbnail image for an item (binary PUT)."""
pn = urllib.parse.quote(part_number, safe="")
url = f"{self.base_url}/items/{pn}/thumbnail"
headers = {
"Content-Type": content_type,
"Content-Length": str(len(image_bytes)),
}
headers.update(self._auth_headers())
req = urllib.request.Request(
url, data=image_bytes, headers=headers, method="PUT"
)
try:
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
resp.read()
except urllib.error.HTTPError as e:
raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
# -- Audit --------------------------------------------------------------
def get_audit_completeness(
self,
project: str = "",
category: str = "",
min_score: Optional[float] = None,
max_score: Optional[float] = None,
sort: str = "",
limit: int = 100,
offset: int = 0,
) -> list:
"""Get component audit completeness scores."""
params = [f"limit={limit}", f"offset={offset}"]
if project:
params.append(f"project={urllib.parse.quote(project)}")
if category:
params.append(f"category={urllib.parse.quote(category)}")
if min_score is not None:
params.append(f"min_score={min_score}")
if max_score is not None:
params.append(f"max_score={max_score}")
if sort:
params.append(f"sort={urllib.parse.quote(sort)}")
return self._request("GET", "/audit/completeness?" + "&".join(params))
def get_audit_item_completeness(self, part_number: str) -> Dict[str, Any]:
"""Get audit completeness details for a single item."""
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/audit/completeness/{pn}")
# -- DAG (feature tree) -------------------------------------------------
def push_dag(
self,
part_number: str,
revision_number: int,
nodes: List[Dict[str, Any]],
edges: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Push a feature DAG to the server.
Calls ``PUT /api/items/{partNumber}/dag`` with the payload
described in ``MULTI_USER_CLIENT.md`` Section 2.
"""
pn = urllib.parse.quote(part_number, safe="")
return self._request(
"PUT",
f"/items/{pn}/dag",
{
"revision_number": revision_number,
"nodes": nodes,
"edges": edges,
},
)
def get_dag(
self, part_number: str, revision_number: Optional[int] = None
) -> Dict[str, Any]:
"""Fetch the stored DAG for an item.
Calls ``GET /api/items/{partNumber}/dag``, optionally filtered
by *revision_number*.
"""
pn = urllib.parse.quote(part_number, safe="")
path = f"/items/{pn}/dag"
if revision_number is not None:
path += f"?revision={revision_number}"
return self._request("GET", path)
def get_dag_forward_cone(self, part_number: str, node_key: str) -> Dict[str, Any]:
"""Get the forward cone (downstream dependents) of a DAG node."""
pn = urllib.parse.quote(part_number, safe="")
nk = urllib.parse.quote(node_key, safe="")
return self._request("GET", f"/items/{pn}/dag/forward-cone/{nk}")
def get_dag_dirty(self, part_number: str) -> Dict[str, Any]:
"""Get the dirty subgraph of an item's DAG."""
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/dag/dirty")
def mark_dag_dirty(self, part_number: str, node_key: str) -> Dict[str, Any]:
"""Mark a DAG node as dirty (needs recomputation)."""
pn = urllib.parse.quote(part_number, safe="")
nk = urllib.parse.quote(node_key, safe="")
return self._request("POST", f"/items/{pn}/dag/mark-dirty/{nk}")
# -- Jobs ---------------------------------------------------------------
def list_jobs(
self,
status: str = "",
item_id: str = "",
definition: str = "",
limit: int = 100,
) -> list:
"""List jobs, optionally filtered by status, item, or definition."""
params = [f"limit={limit}"]
if status:
params.append(f"status={urllib.parse.quote(status)}")
if item_id:
params.append(f"item_id={urllib.parse.quote(item_id)}")
if definition:
params.append(f"definition={urllib.parse.quote(definition)}")
return self._request("GET", "/jobs?" + "&".join(params))
def get_job(self, job_id: str) -> Dict[str, Any]:
"""Get details for a specific job."""
return self._request("GET", f"/jobs/{urllib.parse.quote(job_id, safe='')}")
def get_job_logs(self, job_id: str) -> list:
"""Get log entries for a job."""
return self._request("GET", f"/jobs/{urllib.parse.quote(job_id, safe='')}/logs")
def trigger_job(
self,
definition_name: str,
part_number: str = "",
item_id: str = "",
) -> Dict[str, Any]:
"""Manually trigger a job.
Either *part_number* or *item_id* should identify the target item.
"""
data: Dict[str, Any] = {"definition_name": definition_name}
if part_number:
data["part_number"] = part_number
if item_id:
data["item_id"] = item_id
return self._request("POST", "/jobs", data)
def cancel_job(self, job_id: str) -> Dict[str, Any]:
"""Cancel a pending or running job."""
return self._request(
"POST", f"/jobs/{urllib.parse.quote(job_id, safe='')}/cancel"
)
# -- Job Definitions ----------------------------------------------------
def list_job_definitions(self) -> list:
"""List all loaded job definitions."""
return self._request("GET", "/job-definitions")
def get_job_definition(self, name: str) -> Dict[str, Any]:
"""Get a specific job definition by name."""
return self._request(
"GET", f"/job-definitions/{urllib.parse.quote(name, safe='')}"
)
def reload_job_definitions(self) -> Dict[str, Any]:
"""Re-read YAML job definitions from disk (admin only)."""
return self._request("POST", "/job-definitions/reload")
# -- Runners (admin) ----------------------------------------------------
def list_runners(self) -> list:
"""List all registered runners."""
return self._request("GET", "/runners")
def register_runner(self, name: str, tags: List[str]) -> Dict[str, Any]:
"""Register a new runner. Returns the token (shown once)."""
return self._request("POST", "/runners", {"name": name, "tags": tags})
def delete_runner(self, runner_id: str) -> None:
"""Delete a registered runner."""
self._request(
"DELETE", f"/runners/{urllib.parse.quote(runner_id, safe='')}", raw=True
)
# -- Runner-facing ------------------------------------------------------
def runner_heartbeat(self) -> Dict[str, Any]:
"""Send a heartbeat from the runner to the server."""
return self._request("POST", "/runner/heartbeat")
def runner_claim_job(self) -> Optional[Dict[str, Any]]:
"""Attempt to claim the next available job for this runner."""
return self._request("POST", "/runner/claim")
def runner_update_progress(
self, job_id: str, progress: float, message: str = ""
) -> Dict[str, Any]:
"""Report progress on a running job."""
jid = urllib.parse.quote(job_id, safe="")
data: Dict[str, Any] = {"progress": progress}
if message:
data["message"] = message
return self._request("PUT", f"/runner/jobs/{jid}/progress", data)
def runner_complete_job(
self, job_id: str, result: Optional[Dict] = None
) -> Dict[str, Any]:
"""Mark a job as successfully completed."""
jid = urllib.parse.quote(job_id, safe="")
data: Dict[str, Any] = {}
if result is not None:
data["result"] = result
return self._request("POST", f"/runner/jobs/{jid}/complete", data)
def runner_fail_job(self, job_id: str, error_message: str = "") -> Dict[str, Any]:
"""Mark a job as failed."""
jid = urllib.parse.quote(job_id, safe="")
data: Dict[str, Any] = {}
if error_message:
data["error"] = error_message
return self._request("POST", f"/runner/jobs/{jid}/fail", data)
def runner_append_log(
self,
job_id: str,
level: str,
message: str,
metadata: Optional[Dict] = None,
) -> Dict[str, Any]:
"""Append a log entry to a running job."""
jid = urllib.parse.quote(job_id, safe="")
data: Dict[str, Any] = {"level": level, "message": message}
if metadata:
data["metadata"] = metadata
return self._request("POST", f"/runner/jobs/{jid}/log", data)
def runner_push_dag(
self,
job_id: str,
revision_number: int,
nodes: List[Dict[str, Any]],
edges: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Push a feature DAG from a runner job."""
jid = urllib.parse.quote(job_id, safe="")
return self._request(
"PUT",
f"/runner/jobs/{jid}/dag",
{
"revision_number": revision_number,
"nodes": nodes,
"edges": edges,
},
)