Add get_schema_name() to SiloSettings base class with default 'kindred-rd'. Update get_schema(), get_schema_form(), and get_property_schema() to fall back to settings instead of hardcoding the schema name. Add category query parameter support to get_property_schema(). Closes #28
825 lines
31 KiB
Python
825 lines
31 KiB
Python
"""Silo API client -- shared HTTP client for Silo REST API.
|
|
|
|
This package provides ``SiloClient``, a pure-Python HTTP client for the
|
|
Silo parts-database server. It uses only ``urllib`` (no external
|
|
dependencies) so it can be embedded in FreeCAD workbenches, LibreOffice
|
|
extensions, and CLI tools without any packaging overhead.
|
|
|
|
Consumers supply a ``SiloSettings`` adapter that tells the client where
|
|
to find credentials and how to persist authentication state.
|
|
"""
|
|
|
|
import http.cookiejar
|
|
import json
|
|
import os
|
|
import socket
|
|
import ssl
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from ._ssl import build_ssl_context
|
|
|
|
__version__ = "0.2.0"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Settings protocol
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class SiloSettings:
|
|
"""Abstract settings interface.
|
|
|
|
Consumers must subclass this and implement all methods to bridge the
|
|
client to their application's configuration storage (FreeCAD
|
|
preferences, JSON file, environment variables, etc.).
|
|
"""
|
|
|
|
def get_api_url(self) -> str:
|
|
"""Return the Silo API base URL (e.g. ``http://localhost:8080/api``)."""
|
|
raise NotImplementedError
|
|
|
|
def get_api_token(self) -> str:
|
|
"""Return the stored API bearer token, or ``""``."""
|
|
raise NotImplementedError
|
|
|
|
def get_ssl_verify(self) -> bool:
|
|
"""Return whether to verify server TLS certificates."""
|
|
raise NotImplementedError
|
|
|
|
def get_ssl_cert_path(self) -> str:
|
|
"""Return path to a custom CA certificate file, or ``""``."""
|
|
raise NotImplementedError
|
|
|
|
def save_auth(self, username: str, role: str = "", source: str = "", token: str = ""):
|
|
"""Persist authentication info after a successful login."""
|
|
raise NotImplementedError
|
|
|
|
def clear_auth(self):
|
|
"""Remove all stored authentication credentials."""
|
|
raise NotImplementedError
|
|
|
|
def get_schema_name(self) -> str:
|
|
"""Return the active part-numbering schema name (default ``"kindred-rd"``)."""
|
|
return "kindred-rd"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SiloClient
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class SiloClient:
|
|
"""HTTP client for the Silo REST API.
|
|
|
|
Args:
|
|
settings: A :class:`SiloSettings` adapter for configuration storage.
|
|
base_url: Optional explicit base URL (overrides ``settings``).
|
|
"""
|
|
|
|
def __init__(self, settings: SiloSettings, base_url: str = None):
|
|
self._settings = settings
|
|
self._explicit_url = base_url
|
|
|
|
# -- URL helpers --------------------------------------------------------
|
|
|
|
@property
|
|
def base_url(self) -> str:
|
|
if self._explicit_url:
|
|
return self._explicit_url.rstrip("/")
|
|
url = self._settings.get_api_url().rstrip("/")
|
|
if not url:
|
|
url = os.environ.get("SILO_API_URL", "http://localhost:8080/api")
|
|
# Auto-append /api for bare origins
|
|
parsed = urllib.parse.urlparse(url)
|
|
if not parsed.path or parsed.path == "/":
|
|
url = url + "/api"
|
|
return url
|
|
|
|
@property
|
|
def _origin(self) -> str:
|
|
"""Server origin (without ``/api``) for auth endpoints."""
|
|
base = self.base_url
|
|
return base.rsplit("/api", 1)[0] if base.endswith("/api") else base
|
|
|
|
def _ssl_context(self) -> ssl.SSLContext:
|
|
return build_ssl_context(
|
|
self._settings.get_ssl_verify(),
|
|
self._settings.get_ssl_cert_path(),
|
|
)
|
|
|
|
# -- Auth headers -------------------------------------------------------
|
|
|
|
def _auth_headers(self) -> Dict[str, str]:
|
|
token = self._settings.get_api_token()
|
|
if not token:
|
|
token = os.environ.get("SILO_API_TOKEN", "")
|
|
if token:
|
|
return {"Authorization": f"Bearer {token}"}
|
|
return {}
|
|
|
|
# -- Core HTTP ----------------------------------------------------------
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
data: Optional[Dict] = None,
|
|
raw: bool = False,
|
|
) -> Any:
|
|
"""Make an authenticated JSON request.
|
|
|
|
If *raw* is True the response bytes are returned instead.
|
|
"""
|
|
url = f"{self.base_url}{path}"
|
|
headers = {"Content-Type": "application/json"}
|
|
headers.update(self._auth_headers())
|
|
body = json.dumps(data).encode() if data else None
|
|
req = urllib.request.Request(url, data=body, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
|
|
payload = resp.read()
|
|
if raw:
|
|
return payload
|
|
return json.loads(payload.decode())
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 401:
|
|
self._settings.clear_auth()
|
|
error_body = e.read().decode()
|
|
raise RuntimeError(f"API error {e.code}: {error_body}")
|
|
except urllib.error.URLError as e:
|
|
raise RuntimeError(f"Connection error: {e.reason}")
|
|
|
|
def _download(self, path: str) -> bytes:
|
|
"""Download raw bytes from an API path."""
|
|
url = f"{self.base_url}{path}"
|
|
req = urllib.request.Request(url, headers=self._auth_headers(), method="GET")
|
|
try:
|
|
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
|
|
return resp.read()
|
|
except urllib.error.HTTPError as e:
|
|
raise RuntimeError(f"Download error {e.code}: {e.read().decode()}")
|
|
except urllib.error.URLError as e:
|
|
raise RuntimeError(f"Connection error: {e.reason}")
|
|
|
|
def _download_file(
|
|
self,
|
|
part_number: str,
|
|
revision: int,
|
|
dest_path: str,
|
|
progress_callback=None,
|
|
) -> bool:
|
|
"""Download a file from MinIO storage.
|
|
|
|
Args:
|
|
progress_callback: Optional ``callable(bytes_downloaded, total_bytes)``.
|
|
*total_bytes* is ``-1`` when the server omits Content-Length.
|
|
"""
|
|
url = f"{self.base_url}/items/{part_number}/file/{revision}"
|
|
req = urllib.request.Request(url, headers=self._auth_headers(), method="GET")
|
|
try:
|
|
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
|
|
total = int(resp.headers.get("Content-Length", -1))
|
|
downloaded = 0
|
|
with open(dest_path, "wb") as f:
|
|
while True:
|
|
chunk = resp.read(8192)
|
|
if not chunk:
|
|
break
|
|
f.write(chunk)
|
|
downloaded += len(chunk)
|
|
if progress_callback is not None:
|
|
progress_callback(downloaded, total)
|
|
return True
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 404:
|
|
return False
|
|
raise RuntimeError(f"Download error {e.code}: {e.read().decode()}")
|
|
except urllib.error.URLError as e:
|
|
raise RuntimeError(f"Connection error: {e.reason}")
|
|
|
|
def _upload_file(
|
|
self, part_number: str, file_path: str, properties: Dict, comment: str = ""
|
|
) -> Dict[str, Any]:
|
|
"""Upload a file and create a new revision."""
|
|
import mimetypes
|
|
|
|
url = f"{self.base_url}/items/{part_number}/file"
|
|
|
|
with open(file_path, "rb") as f:
|
|
file_data = f.read()
|
|
|
|
boundary = "----SiloUploadBoundary" + str(hash(file_path))[-8:]
|
|
body_parts: list = []
|
|
|
|
filename = os.path.basename(file_path)
|
|
content_type = mimetypes.guess_type(filename)[0] or "application/octet-stream"
|
|
body_parts.append(
|
|
f'--{boundary}\r\nContent-Disposition: form-data; name="file"; '
|
|
f'filename="{filename}"\r\nContent-Type: {content_type}\r\n\r\n'
|
|
)
|
|
body_parts.append(file_data)
|
|
body_parts.append(b"\r\n")
|
|
|
|
if comment:
|
|
body_parts.append(
|
|
f"--{boundary}\r\n"
|
|
f'Content-Disposition: form-data; name="comment"\r\n\r\n'
|
|
f"{comment}\r\n"
|
|
)
|
|
|
|
if properties:
|
|
props_json = json.dumps(properties, allow_nan=False, default=str)
|
|
body_parts.append(
|
|
f"--{boundary}\r\n"
|
|
f'Content-Disposition: form-data; name="properties"\r\n\r\n'
|
|
f"{props_json}\r\n"
|
|
)
|
|
|
|
body_parts.append(f"--{boundary}--\r\n")
|
|
|
|
body = b""
|
|
for part in body_parts:
|
|
body += part.encode("utf-8") if isinstance(part, str) else part
|
|
|
|
headers = {
|
|
"Content-Type": f"multipart/form-data; boundary={boundary}",
|
|
"Content-Length": str(len(body)),
|
|
}
|
|
headers.update(self._auth_headers())
|
|
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
|
|
try:
|
|
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
|
|
return json.loads(resp.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}")
|
|
except urllib.error.URLError as e:
|
|
raise RuntimeError(f"Connection error: {e.reason}")
|
|
|
|
def _upload_ods(self, path: str, ods_bytes: bytes, filename: str = "upload.ods") -> Any:
|
|
"""POST an ODS file as multipart/form-data."""
|
|
boundary = "----SiloCalcUpload" + str(abs(hash(filename)))[-8:]
|
|
parts: list = []
|
|
parts.append(
|
|
f"--{boundary}\r\n"
|
|
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
|
|
f"Content-Type: application/vnd.oasis.opendocument.spreadsheet\r\n\r\n"
|
|
)
|
|
parts.append(ods_bytes)
|
|
parts.append(f"\r\n--{boundary}--\r\n")
|
|
|
|
body = b""
|
|
for p in parts:
|
|
body += p.encode("utf-8") if isinstance(p, str) else p
|
|
|
|
url = f"{self.base_url}{path}"
|
|
headers = {
|
|
"Content-Type": f"multipart/form-data; boundary={boundary}",
|
|
"Content-Length": str(len(body)),
|
|
}
|
|
headers.update(self._auth_headers())
|
|
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
|
|
try:
|
|
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
|
|
return json.loads(resp.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}")
|
|
except urllib.error.URLError as e:
|
|
raise RuntimeError(f"Connection error: {e.reason}")
|
|
|
|
# -- Authentication -----------------------------------------------------
|
|
|
|
def login(self, username: str, password: str, token_name: str = "") -> Dict[str, Any]:
|
|
"""Session login and create a persistent API token.
|
|
|
|
Performs: ``POST /login`` (session) -> ``GET /api/auth/me`` (verify)
|
|
-> ``POST /api/auth/tokens`` (create token). The token is persisted
|
|
via ``settings.save_auth()``.
|
|
|
|
Args:
|
|
token_name: Human-readable label for the token. Defaults to
|
|
the local hostname.
|
|
"""
|
|
ctx = self._ssl_context()
|
|
cookie_jar = http.cookiejar.CookieJar()
|
|
opener = urllib.request.build_opener(
|
|
urllib.request.HTTPCookieProcessor(cookie_jar),
|
|
urllib.request.HTTPSHandler(context=ctx),
|
|
)
|
|
|
|
# Step 1: POST credentials to /login
|
|
login_url = f"{self._origin}/login"
|
|
form_data = urllib.parse.urlencode({"username": username, "password": password}).encode()
|
|
req = urllib.request.Request(
|
|
login_url,
|
|
data=form_data,
|
|
method="POST",
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
)
|
|
try:
|
|
opener.open(req)
|
|
except urllib.error.HTTPError as e:
|
|
if e.code not in (302, 303):
|
|
raise RuntimeError(f"Login failed (HTTP {e.code})")
|
|
except urllib.error.URLError as e:
|
|
raise RuntimeError(f"Connection error: {e.reason}")
|
|
|
|
# Step 2: Verify session via /api/auth/me
|
|
me_req = urllib.request.Request(f"{self._origin}/api/auth/me", method="GET")
|
|
try:
|
|
with opener.open(me_req) as resp:
|
|
user_info = json.loads(resp.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 401:
|
|
raise RuntimeError("Login failed: invalid username or password")
|
|
raise RuntimeError(f"Login verification failed (HTTP {e.code})")
|
|
|
|
# Step 3: Create a persistent API token
|
|
if not token_name:
|
|
token_name = socket.gethostname()
|
|
token_body = json.dumps({"name": token_name, "expires_in_days": 90}).encode()
|
|
token_req = urllib.request.Request(
|
|
f"{self._origin}/api/auth/tokens",
|
|
data=token_body,
|
|
method="POST",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
try:
|
|
with opener.open(token_req) as resp:
|
|
token_result = json.loads(resp.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
raise RuntimeError(f"Failed to create API token (HTTP {e.code})")
|
|
|
|
raw_token = token_result.get("token", "")
|
|
if not raw_token:
|
|
raise RuntimeError("Server did not return an API token")
|
|
|
|
self._settings.save_auth(
|
|
username=user_info.get("username", username),
|
|
role=user_info.get("role", ""),
|
|
source=user_info.get("auth_source", ""),
|
|
token=raw_token,
|
|
)
|
|
|
|
return {
|
|
"username": user_info.get("username", username),
|
|
"role": user_info.get("role", ""),
|
|
"auth_source": user_info.get("auth_source", ""),
|
|
"token_name": token_result.get("name", ""),
|
|
"token_prefix": token_result.get("token_prefix", ""),
|
|
}
|
|
|
|
def logout(self):
|
|
"""Clear stored API token and authentication info."""
|
|
self._settings.clear_auth()
|
|
|
|
def is_authenticated(self) -> bool:
|
|
"""Return True if a valid API token is configured."""
|
|
return bool(self._settings.get_api_token() or os.environ.get("SILO_API_TOKEN"))
|
|
|
|
def get_current_user(self) -> Optional[Dict[str, Any]]:
|
|
"""Fetch the current user info from the server."""
|
|
try:
|
|
return self._request("GET", "/auth/me")
|
|
except RuntimeError:
|
|
return None
|
|
|
|
def list_tokens(self) -> List[Dict[str, Any]]:
|
|
"""List API tokens for the current user."""
|
|
return self._request("GET", "/auth/tokens")
|
|
|
|
def create_token(self, name: str, expires_in_days: Optional[int] = None) -> Dict[str, Any]:
|
|
"""Create a new API token."""
|
|
data: Dict[str, Any] = {"name": name}
|
|
if expires_in_days is not None:
|
|
data["expires_in_days"] = expires_in_days
|
|
return self._request("POST", "/auth/tokens", data)
|
|
|
|
def revoke_token(self, token_id: str) -> None:
|
|
"""Revoke an API token by its ID."""
|
|
url = f"{self.base_url}/auth/tokens/{token_id}"
|
|
headers = {"Content-Type": "application/json"}
|
|
headers.update(self._auth_headers())
|
|
req = urllib.request.Request(url, headers=headers, method="DELETE")
|
|
try:
|
|
urllib.request.urlopen(req, context=self._ssl_context())
|
|
except urllib.error.HTTPError as e:
|
|
raise RuntimeError(f"API error {e.code}: {e.read().decode()}")
|
|
except urllib.error.URLError as e:
|
|
raise RuntimeError(f"Connection error: {e.reason}")
|
|
|
|
def check_connection(self) -> Tuple[bool, str]:
|
|
"""Check connectivity to the Silo server.
|
|
|
|
Returns ``(reachable, message)``.
|
|
"""
|
|
url = f"{self._origin}/health"
|
|
req = urllib.request.Request(url, method="GET")
|
|
try:
|
|
with urllib.request.urlopen(req, context=self._ssl_context(), timeout=5) as resp:
|
|
return True, f"OK ({resp.status})"
|
|
except urllib.error.HTTPError as e:
|
|
return True, f"Server error ({e.code})"
|
|
except urllib.error.URLError as e:
|
|
return False, str(e.reason)
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
# -- Items --------------------------------------------------------------
|
|
|
|
def get_item(self, part_number: str) -> Dict[str, Any]:
|
|
return self._request("GET", f"/items/{urllib.parse.quote(part_number, safe='')}")
|
|
|
|
def list_items(
|
|
self,
|
|
search: str = "",
|
|
item_type: str = "",
|
|
project: str = "",
|
|
limit: int = 100,
|
|
) -> list:
|
|
params = [f"limit={limit}"]
|
|
if search:
|
|
params.append(f"search={urllib.parse.quote(search)}")
|
|
if item_type:
|
|
params.append(f"type={item_type}")
|
|
if project:
|
|
params.append(f"project={urllib.parse.quote(project)}")
|
|
return self._request("GET", "/items?" + "&".join(params))
|
|
|
|
def create_item(
|
|
self,
|
|
schema: str,
|
|
category: str,
|
|
description: str = "",
|
|
projects: Optional[List[str]] = None,
|
|
sourcing_type: str = "",
|
|
sourcing_link: str = "",
|
|
standard_cost: Optional[float] = None,
|
|
long_description: str = "",
|
|
) -> Dict[str, Any]:
|
|
data: Dict[str, Any] = {
|
|
"schema": schema,
|
|
"category": category,
|
|
"description": description,
|
|
}
|
|
if projects:
|
|
data["projects"] = projects
|
|
if sourcing_type:
|
|
data["sourcing_type"] = sourcing_type
|
|
if sourcing_link:
|
|
data["sourcing_link"] = sourcing_link
|
|
if standard_cost is not None:
|
|
data["standard_cost"] = standard_cost
|
|
if long_description:
|
|
data["long_description"] = long_description
|
|
return self._request("POST", "/items", data)
|
|
|
|
def update_item(self, part_number: str, **fields) -> Dict[str, Any]:
|
|
return self._request(
|
|
"PUT",
|
|
f"/items/{urllib.parse.quote(part_number, safe='')}",
|
|
fields,
|
|
)
|
|
|
|
# -- Revisions ----------------------------------------------------------
|
|
|
|
def get_revisions(self, part_number: str) -> list:
|
|
return self._request(
|
|
"GET",
|
|
f"/items/{urllib.parse.quote(part_number, safe='')}/revisions",
|
|
)
|
|
|
|
def get_revision(self, part_number: str, revision: int) -> Dict[str, Any]:
|
|
return self._request(
|
|
"GET",
|
|
f"/items/{urllib.parse.quote(part_number, safe='')}/revisions/{revision}",
|
|
)
|
|
|
|
def compare_revisions(self, part_number: str, from_rev: int, to_rev: int) -> Dict[str, Any]:
|
|
pn = urllib.parse.quote(part_number, safe="")
|
|
return self._request(
|
|
"GET",
|
|
f"/items/{pn}/revisions/compare?from={from_rev}&to={to_rev}",
|
|
)
|
|
|
|
def rollback_revision(
|
|
self, part_number: str, revision: int, comment: str = ""
|
|
) -> Dict[str, Any]:
|
|
data: Dict[str, Any] = {}
|
|
if comment:
|
|
data["comment"] = comment
|
|
pn = urllib.parse.quote(part_number, safe="")
|
|
return self._request("POST", f"/items/{pn}/revisions/{revision}/rollback", data)
|
|
|
|
def update_revision(
|
|
self,
|
|
part_number: str,
|
|
revision: int,
|
|
status: str = None,
|
|
labels: list = None,
|
|
) -> Dict[str, Any]:
|
|
data: Dict[str, Any] = {}
|
|
if status:
|
|
data["status"] = status
|
|
if labels is not None:
|
|
data["labels"] = labels
|
|
pn = urllib.parse.quote(part_number, safe="")
|
|
return self._request("PATCH", f"/items/{pn}/revisions/{revision}", data)
|
|
|
|
def has_file(self, part_number: str) -> Tuple[bool, Optional[int]]:
|
|
"""Check if item has files in storage."""
|
|
try:
|
|
revisions = self.get_revisions(part_number)
|
|
for rev in revisions:
|
|
if rev.get("file_key"):
|
|
return True, rev["revision_number"]
|
|
return False, None
|
|
except Exception:
|
|
return False, None
|
|
|
|
def latest_file_revision(self, part_number: str) -> Optional[Dict]:
|
|
"""Return the most recent revision with a file, or ``None``."""
|
|
try:
|
|
revisions = self.get_revisions(part_number)
|
|
for rev in revisions:
|
|
if rev.get("file_key"):
|
|
return rev
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
# -- BOM / Relationships ------------------------------------------------
|
|
|
|
def get_bom(self, part_number: str) -> list:
|
|
pn = urllib.parse.quote(part_number, safe="")
|
|
return self._request("GET", f"/items/{pn}/bom")
|
|
|
|
def get_bom_expanded(self, part_number: str, depth: int = 10) -> list:
|
|
pn = urllib.parse.quote(part_number, safe="")
|
|
return self._request("GET", f"/items/{pn}/bom/expanded?depth={depth}")
|
|
|
|
def get_bom_where_used(self, part_number: str) -> list:
|
|
pn = urllib.parse.quote(part_number, safe="")
|
|
return self._request("GET", f"/items/{pn}/bom/where-used")
|
|
|
|
def add_bom_entry(
|
|
self,
|
|
parent_pn: str,
|
|
child_pn: str,
|
|
quantity: Optional[float] = None,
|
|
unit: str = None,
|
|
rel_type: str = "component",
|
|
ref_des: list = None,
|
|
metadata: Optional[Dict] = None,
|
|
) -> Dict[str, Any]:
|
|
data: Dict[str, Any] = {
|
|
"child_part_number": child_pn,
|
|
"rel_type": rel_type,
|
|
}
|
|
if quantity is not None:
|
|
data["quantity"] = quantity
|
|
if unit:
|
|
data["unit"] = unit
|
|
if ref_des:
|
|
data["reference_designators"] = ref_des
|
|
if metadata:
|
|
data["metadata"] = metadata
|
|
pn = urllib.parse.quote(parent_pn, safe="")
|
|
return self._request("POST", f"/items/{pn}/bom", data)
|
|
|
|
def update_bom_entry(
|
|
self,
|
|
parent_pn: str,
|
|
child_pn: str,
|
|
quantity: Optional[float] = None,
|
|
unit: str = None,
|
|
rel_type: str = None,
|
|
ref_des: list = None,
|
|
metadata: Optional[Dict] = None,
|
|
) -> Dict[str, Any]:
|
|
data: Dict[str, Any] = {}
|
|
if quantity is not None:
|
|
data["quantity"] = quantity
|
|
if unit is not None:
|
|
data["unit"] = unit
|
|
if rel_type is not None:
|
|
data["rel_type"] = rel_type
|
|
if ref_des is not None:
|
|
data["reference_designators"] = ref_des
|
|
if metadata:
|
|
data["metadata"] = metadata
|
|
ppn = urllib.parse.quote(parent_pn, safe="")
|
|
cpn = urllib.parse.quote(child_pn, safe="")
|
|
return self._request("PUT", f"/items/{ppn}/bom/{cpn}", data)
|
|
|
|
def delete_bom_entry(self, parent_pn: str, child_pn: str) -> None:
|
|
ppn = urllib.parse.quote(parent_pn, safe="")
|
|
cpn = urllib.parse.quote(child_pn, safe="")
|
|
self._request("DELETE", f"/items/{ppn}/bom/{cpn}", raw=True)
|
|
|
|
# -- Schemas ------------------------------------------------------------
|
|
|
|
def list_schemas(self) -> list:
|
|
"""List all available schemas."""
|
|
return self._request("GET", "/schemas")
|
|
|
|
def get_schema(self, name: str = "") -> Dict[str, Any]:
|
|
if not name:
|
|
name = self._settings.get_schema_name()
|
|
return self._request("GET", f"/schemas/{urllib.parse.quote(name, safe='')}")
|
|
|
|
def get_schema_form(self, name: str = "") -> Dict[str, Any]:
|
|
"""Get form descriptor for a schema (field groups, widgets, category picker)."""
|
|
if not name:
|
|
name = self._settings.get_schema_name()
|
|
return self._request(
|
|
"GET",
|
|
f"/schemas/{urllib.parse.quote(name, safe='')}/form",
|
|
)
|
|
|
|
def get_property_schema(self, name: str = "", category: str = "") -> Dict[str, Any]:
|
|
if not name:
|
|
name = self._settings.get_schema_name()
|
|
path = f"/schemas/{urllib.parse.quote(name, safe='')}/properties"
|
|
if category:
|
|
path += f"?category={urllib.parse.quote(category)}"
|
|
return self._request("GET", path)
|
|
|
|
def add_enum_value(
|
|
self, schema: str, segment: str, code: str, label: str = ""
|
|
) -> Dict[str, Any]:
|
|
"""Add a value to an enum segment in a schema."""
|
|
s = urllib.parse.quote(schema, safe="")
|
|
seg = urllib.parse.quote(segment, safe="")
|
|
data: Dict[str, Any] = {"code": code}
|
|
if label:
|
|
data["label"] = label
|
|
return self._request("POST", f"/schemas/{s}/segments/{seg}/values", data)
|
|
|
|
def update_enum_value(self, schema: str, segment: str, code: str, **fields) -> Dict[str, Any]:
|
|
"""Update an enum value in a schema segment."""
|
|
s = urllib.parse.quote(schema, safe="")
|
|
seg = urllib.parse.quote(segment, safe="")
|
|
c = urllib.parse.quote(code, safe="")
|
|
return self._request("PUT", f"/schemas/{s}/segments/{seg}/values/{c}", fields)
|
|
|
|
def delete_enum_value(self, schema: str, segment: str, code: str) -> None:
|
|
"""Delete an enum value from a schema segment."""
|
|
s = urllib.parse.quote(schema, safe="")
|
|
seg = urllib.parse.quote(segment, safe="")
|
|
c = urllib.parse.quote(code, safe="")
|
|
self._request("DELETE", f"/schemas/{s}/segments/{seg}/values/{c}", raw=True)
|
|
|
|
# -- Projects -----------------------------------------------------------
|
|
|
|
def get_projects(self) -> list:
|
|
return self._request("GET", "/projects")
|
|
|
|
def get_project_items(self, code: str) -> list:
|
|
return self._request("GET", f"/projects/{urllib.parse.quote(code, safe='')}/items")
|
|
|
|
def get_item_projects(self, part_number: str) -> list:
|
|
pn = urllib.parse.quote(part_number, safe="")
|
|
return self._request("GET", f"/items/{pn}/projects")
|
|
|
|
def add_item_projects(self, part_number: str, project_codes: List[str]) -> Dict[str, Any]:
|
|
pn = urllib.parse.quote(part_number, safe="")
|
|
return self._request("POST", f"/items/{pn}/projects", {"projects": project_codes})
|
|
|
|
# -- Part number generation ---------------------------------------------
|
|
|
|
def generate_part_number(self, schema: str, category: str) -> Dict[str, Any]:
|
|
return self._request(
|
|
"POST",
|
|
"/generate-part-number",
|
|
{"schema": schema, "category": category},
|
|
)
|
|
|
|
# -- ODS endpoints ------------------------------------------------------
|
|
|
|
def download_bom_ods(self, part_number: str) -> bytes:
|
|
pn = urllib.parse.quote(part_number, safe="")
|
|
return self._download(f"/items/{pn}/bom/export.ods")
|
|
|
|
def download_project_sheet(self, project_code: str) -> bytes:
|
|
code = urllib.parse.quote(project_code, safe="")
|
|
return self._download(f"/projects/{code}/sheet.ods")
|
|
|
|
def upload_sheet_diff(self, ods_bytes: bytes, filename: str = "sheet.ods") -> Dict[str, Any]:
|
|
return self._upload_ods("/sheets/diff", ods_bytes, filename)
|
|
|
|
# -- DAG (feature tree) -------------------------------------------------
|
|
|
|
def push_dag(
|
|
self,
|
|
part_number: str,
|
|
revision_number: int,
|
|
nodes: List[Dict[str, Any]],
|
|
edges: List[Dict[str, Any]],
|
|
) -> Dict[str, Any]:
|
|
"""Push a feature DAG to the server.
|
|
|
|
Calls ``PUT /api/items/{partNumber}/dag`` with the payload
|
|
described in ``MULTI_USER_CLIENT.md`` Section 2.
|
|
"""
|
|
pn = urllib.parse.quote(part_number, safe="")
|
|
return self._request(
|
|
"PUT",
|
|
f"/items/{pn}/dag",
|
|
{
|
|
"revision_number": revision_number,
|
|
"nodes": nodes,
|
|
"edges": edges,
|
|
},
|
|
)
|
|
|
|
def get_dag(self, part_number: str, revision_number: Optional[int] = None) -> Dict[str, Any]:
|
|
"""Fetch the stored DAG for an item.
|
|
|
|
Calls ``GET /api/items/{partNumber}/dag``, optionally filtered
|
|
by *revision_number*.
|
|
"""
|
|
pn = urllib.parse.quote(part_number, safe="")
|
|
path = f"/items/{pn}/dag"
|
|
if revision_number is not None:
|
|
path += f"?revision={revision_number}"
|
|
return self._request("GET", path)
|
|
|
|
# -- Jobs ---------------------------------------------------------------
|
|
|
|
def list_jobs(
|
|
self,
|
|
status: str = "",
|
|
item_id: str = "",
|
|
definition: str = "",
|
|
limit: int = 100,
|
|
) -> list:
|
|
"""List jobs, optionally filtered by status, item, or definition."""
|
|
params = [f"limit={limit}"]
|
|
if status:
|
|
params.append(f"status={urllib.parse.quote(status)}")
|
|
if item_id:
|
|
params.append(f"item_id={urllib.parse.quote(item_id)}")
|
|
if definition:
|
|
params.append(f"definition={urllib.parse.quote(definition)}")
|
|
return self._request("GET", "/jobs?" + "&".join(params))
|
|
|
|
def get_job(self, job_id: str) -> Dict[str, Any]:
|
|
"""Get details for a specific job."""
|
|
return self._request("GET", f"/jobs/{urllib.parse.quote(job_id, safe='')}")
|
|
|
|
def get_job_logs(self, job_id: str) -> list:
|
|
"""Get log entries for a job."""
|
|
return self._request("GET", f"/jobs/{urllib.parse.quote(job_id, safe='')}/logs")
|
|
|
|
def trigger_job(
|
|
self,
|
|
definition_name: str,
|
|
part_number: str = "",
|
|
item_id: str = "",
|
|
) -> Dict[str, Any]:
|
|
"""Manually trigger a job.
|
|
|
|
Either *part_number* or *item_id* should identify the target item.
|
|
"""
|
|
data: Dict[str, Any] = {"definition_name": definition_name}
|
|
if part_number:
|
|
data["part_number"] = part_number
|
|
if item_id:
|
|
data["item_id"] = item_id
|
|
return self._request("POST", "/jobs", data)
|
|
|
|
def cancel_job(self, job_id: str) -> Dict[str, Any]:
|
|
"""Cancel a pending or running job."""
|
|
return self._request("POST", f"/jobs/{urllib.parse.quote(job_id, safe='')}/cancel")
|
|
|
|
# -- Job Definitions ----------------------------------------------------
|
|
|
|
def list_job_definitions(self) -> list:
|
|
"""List all loaded job definitions."""
|
|
return self._request("GET", "/job-definitions")
|
|
|
|
def get_job_definition(self, name: str) -> Dict[str, Any]:
|
|
"""Get a specific job definition by name."""
|
|
return self._request("GET", f"/job-definitions/{urllib.parse.quote(name, safe='')}")
|
|
|
|
def reload_job_definitions(self) -> Dict[str, Any]:
|
|
"""Re-read YAML job definitions from disk (admin only)."""
|
|
return self._request("POST", "/job-definitions/reload")
|
|
|
|
# -- Runners (admin) ----------------------------------------------------
|
|
|
|
def list_runners(self) -> list:
|
|
"""List all registered runners."""
|
|
return self._request("GET", "/runners")
|
|
|
|
def register_runner(self, name: str, tags: List[str]) -> Dict[str, Any]:
|
|
"""Register a new runner. Returns the token (shown once)."""
|
|
return self._request("POST", "/runners", {"name": name, "tags": tags})
|
|
|
|
def delete_runner(self, runner_id: str) -> None:
|
|
"""Delete a registered runner."""
|
|
self._request("DELETE", f"/runners/{urllib.parse.quote(runner_id, safe='')}", raw=True)
|