initial: shared Python API client for Silo PLM

Extracted from silo monorepo. Provides SiloClient (HTTP client) and
SiloSettings (abstract config adapter) so both the FreeCAD workbench
and LibreOffice Calc extension share the same API layer.

Includes CATEGORY_NAMES, sanitize_filename, parse_part_number,
get_category_folder_name, and SSL context builder.
This commit is contained in:
Zoe Forbes
2026-02-06 11:14:21 -06:00
commit a6ac3d4d06
6 changed files with 1014 additions and 0 deletions

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026 Kindred Systems LLC
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

36
README.md Normal file
View File

@@ -0,0 +1,36 @@
# silo-client
Shared Python HTTP client for the Silo REST API.
## Usage
Consumers subclass `SiloSettings` to bridge the client to their configuration storage, then instantiate `SiloClient`:
```python
from silo_client import SiloClient, SiloSettings
class MySettings(SiloSettings):
def get_api_url(self):
return "http://localhost:8080/api"
def get_api_token(self):
return os.environ.get("SILO_API_TOKEN", "")
def get_ssl_verify(self):
return True
def get_ssl_cert_path(self):
return ""
def save_auth(self, username, role="", source="", token=""):
... # persist credentials
def clear_auth(self):
... # remove credentials
client = SiloClient(MySettings())
items = client.list_items(search="M3 screw")
```
## Dependencies
None beyond the Python standard library (`urllib`, `ssl`, `json`).
## License
MIT

913
silo_client/__init__.py Normal file
View File

@@ -0,0 +1,913 @@
"""Silo API client -- shared HTTP client for Silo REST API.
This package provides ``SiloClient``, a pure-Python HTTP client for the
Silo parts-database server. It uses only ``urllib`` (no external
dependencies) so it can be embedded in FreeCAD workbenches, LibreOffice
extensions, and CLI tools without any packaging overhead.
Consumers supply a ``SiloSettings`` adapter that tells the client where
to find credentials and how to persist authentication state.
"""
import http.cookiejar
import json
import os
import re
import socket
import ssl
import urllib.error
import urllib.parse
import urllib.request
from typing import Any, Dict, List, Optional, Tuple
from ._ssl import build_ssl_context
__version__ = "0.1.0"
# ---------------------------------------------------------------------------
# Settings protocol
# ---------------------------------------------------------------------------
class SiloSettings:
"""Abstract settings interface.
Consumers must subclass this and implement all methods to bridge the
client to their application's configuration storage (FreeCAD
preferences, JSON file, environment variables, etc.).
"""
def get_api_url(self) -> str:
"""Return the Silo API base URL (e.g. ``http://localhost:8080/api``)."""
raise NotImplementedError
def get_api_token(self) -> str:
"""Return the stored API bearer token, or ``""``."""
raise NotImplementedError
def get_ssl_verify(self) -> bool:
"""Return whether to verify server TLS certificates."""
raise NotImplementedError
def get_ssl_cert_path(self) -> str:
"""Return path to a custom CA certificate file, or ``""``."""
raise NotImplementedError
def save_auth(
self, username: str, role: str = "", source: str = "", token: str = ""
):
"""Persist authentication info after a successful login."""
raise NotImplementedError
def clear_auth(self):
"""Remove all stored authentication credentials."""
raise NotImplementedError
# ---------------------------------------------------------------------------
# Shared utilities
# ---------------------------------------------------------------------------
# Category name mapping for folder structure.
# Format: CCC -> "descriptive_name"
CATEGORY_NAMES = {
# Fasteners
"F01": "screws_bolts",
"F02": "threaded_rods",
"F03": "eyebolts",
"F04": "u_bolts",
"F05": "nuts",
"F06": "washers",
"F07": "shims",
"F08": "inserts",
"F09": "spacers",
"F10": "pins",
"F11": "anchors",
"F12": "nails",
"F13": "rivets",
"F14": "staples",
"F15": "key_stock",
"F16": "retaining_rings",
"F17": "cable_ties",
"F18": "hook_loop",
# Fluid Fittings
"C01": "full_couplings",
"C02": "half_couplings",
"C03": "reducers",
"C04": "elbows",
"C05": "tees",
"C06": "crosses",
"C07": "unions",
"C08": "adapters",
"C09": "plugs_caps",
"C10": "nipples",
"C11": "flanges",
"C12": "valves",
"C13": "quick_disconnects",
"C14": "hose_barbs",
"C15": "compression_fittings",
"C16": "tubing",
"C17": "hoses",
# Motion Components
"R01": "ball_bearings",
"R02": "roller_bearings",
"R03": "sleeve_bearings",
"R04": "thrust_bearings",
"R05": "linear_bearings",
"R06": "spur_gears",
"R07": "helical_gears",
"R08": "bevel_gears",
"R09": "worm_gears",
"R10": "rack_pinion",
"R11": "sprockets",
"R12": "timing_pulleys",
"R13": "v_belt_pulleys",
"R14": "idler_pulleys",
"R15": "wheels",
"R16": "casters",
"R17": "shaft_couplings",
"R18": "clutches",
"R19": "brakes",
"R20": "lead_screws",
"R21": "ball_screws",
"R22": "linear_rails",
"R23": "linear_actuators",
"R24": "brushed_dc_motor",
"R25": "brushless_dc_motor",
"R26": "stepper_motor",
"R27": "servo_motor",
"R28": "ac_induction_motor",
"R29": "gear_motor",
"R30": "motor_driver",
"R31": "motor_controller",
"R32": "encoder",
"R33": "pneumatic_cylinder",
"R34": "pneumatic_actuator",
"R35": "pneumatic_valve",
"R36": "pneumatic_regulator",
"R37": "pneumatic_frl_unit",
"R38": "air_compressor",
"R39": "vacuum_pump",
"R40": "hydraulic_cylinder",
"R41": "hydraulic_pump",
"R42": "hydraulic_motor",
"R43": "hydraulic_valve",
"R44": "hydraulic_accumulator",
# Structural Materials
"S01": "square_tube",
"S02": "round_tube",
"S03": "rectangular_tube",
"S04": "i_beam",
"S05": "t_slot_extrusion",
"S06": "angle",
"S07": "channel",
"S08": "flat_bar",
"S09": "round_bar",
"S10": "square_bar",
"S11": "hex_bar",
"S12": "sheet_metal",
"S13": "plate",
"S14": "expanded_metal",
"S15": "perforated_sheet",
"S16": "wire_mesh",
"S17": "grating",
# Electrical Components
"E01": "wire",
"E02": "cable",
"E03": "connectors",
"E04": "terminals",
"E05": "circuit_breakers",
"E06": "fuses",
"E07": "relays",
"E08": "contactors",
"E09": "switches",
"E10": "buttons",
"E11": "indicators",
"E12": "resistors",
"E13": "capacitors",
"E14": "inductors",
"E15": "transformers",
"E16": "diodes",
"E17": "transistors",
"E18": "ics",
"E19": "microcontrollers",
"E20": "sensors",
"E21": "displays",
"E22": "power_supplies",
"E23": "batteries",
"E24": "pcb",
"E25": "enclosures",
"E26": "heat_sinks",
"E27": "fans",
# Mechanical Components
"M01": "compression_springs",
"M02": "extension_springs",
"M03": "torsion_springs",
"M04": "gas_springs",
"M05": "dampers",
"M06": "shock_absorbers",
"M07": "vibration_mounts",
"M08": "hinges",
"M09": "latches",
"M10": "handles",
"M11": "knobs",
"M12": "levers",
"M13": "linkages",
"M14": "cams",
"M15": "bellows",
"M16": "seals",
"M17": "o_rings",
"M18": "gaskets",
# Tooling and Fixtures
"T01": "jigs",
"T02": "fixtures",
"T03": "molds",
"T04": "dies",
"T05": "gauges",
"T06": "templates",
"T07": "work_holding",
"T08": "test_fixtures",
# Assemblies
"A01": "mechanical_assembly",
"A02": "electrical_assembly",
"A03": "electromechanical_assembly",
"A04": "subassembly",
"A05": "cable_assembly",
"A06": "pneumatic_assembly",
"A07": "hydraulic_assembly",
# Purchased/Off-the-Shelf
"P01": "purchased_mechanical",
"P02": "purchased_electrical",
"P03": "purchased_assembly",
"P04": "raw_material",
"P05": "consumables",
# Custom Fabricated Parts
"X01": "machined_part",
"X02": "sheet_metal_part",
"X03": "3d_printed_part",
"X04": "cast_part",
"X05": "molded_part",
"X06": "welded_fabrication",
"X07": "laser_cut_part",
"X08": "waterjet_cut_part",
}
def sanitize_filename(name: str) -> str:
"""Sanitize a string for use in filenames."""
sanitized = re.sub(r'[<>:"/\\|?*]', "_", name)
sanitized = re.sub(r"[\s_]+", "_", sanitized)
sanitized = sanitized.strip("_ ")
return sanitized[:50]
def parse_part_number(part_number: str) -> Tuple[str, str]:
"""Parse part number into (category, sequence).
New format: CCC-NNNN (e.g., F01-0001)
"""
parts = part_number.split("-")
if len(parts) >= 2:
return parts[0], parts[1]
return part_number, ""
def get_category_folder_name(category_code: str) -> str:
"""Get the folder name for a category (e.g., ``'F01_screws_bolts'``)."""
name = CATEGORY_NAMES.get(category_code.upper(), "misc")
return f"{category_code}_{name}"
# ---------------------------------------------------------------------------
# SiloClient
# ---------------------------------------------------------------------------
class SiloClient:
"""HTTP client for the Silo REST API.
Args:
settings: A :class:`SiloSettings` adapter for configuration storage.
base_url: Optional explicit base URL (overrides ``settings``).
"""
def __init__(self, settings: SiloSettings, base_url: str = None):
self._settings = settings
self._explicit_url = base_url
# -- URL helpers --------------------------------------------------------
@property
def base_url(self) -> str:
if self._explicit_url:
return self._explicit_url.rstrip("/")
url = self._settings.get_api_url().rstrip("/")
if not url:
url = os.environ.get("SILO_API_URL", "http://localhost:8080/api")
# Auto-append /api for bare origins
parsed = urllib.parse.urlparse(url)
if not parsed.path or parsed.path == "/":
url = url + "/api"
return url
@property
def _origin(self) -> str:
"""Server origin (without ``/api``) for auth endpoints."""
base = self.base_url
return base.rsplit("/api", 1)[0] if base.endswith("/api") else base
def _ssl_context(self) -> ssl.SSLContext:
return build_ssl_context(
self._settings.get_ssl_verify(),
self._settings.get_ssl_cert_path(),
)
# -- Auth headers -------------------------------------------------------
def _auth_headers(self) -> Dict[str, str]:
token = self._settings.get_api_token()
if not token:
token = os.environ.get("SILO_API_TOKEN", "")
if token:
return {"Authorization": f"Bearer {token}"}
return {}
# -- Core HTTP ----------------------------------------------------------
def _request(
self,
method: str,
path: str,
data: Optional[Dict] = None,
raw: bool = False,
) -> Any:
"""Make an authenticated JSON request.
If *raw* is True the response bytes are returned instead.
"""
url = f"{self.base_url}{path}"
headers = {"Content-Type": "application/json"}
headers.update(self._auth_headers())
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
payload = resp.read()
if raw:
return payload
return json.loads(payload.decode())
except urllib.error.HTTPError as e:
if e.code == 401:
self._settings.clear_auth()
error_body = e.read().decode()
raise RuntimeError(f"API error {e.code}: {error_body}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _download(self, path: str) -> bytes:
"""Download raw bytes from an API path."""
url = f"{self.base_url}{path}"
req = urllib.request.Request(url, headers=self._auth_headers(), method="GET")
try:
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
return resp.read()
except urllib.error.HTTPError as e:
raise RuntimeError(f"Download error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _download_file(
self,
part_number: str,
revision: int,
dest_path: str,
progress_callback=None,
) -> bool:
"""Download a file from MinIO storage.
Args:
progress_callback: Optional ``callable(bytes_downloaded, total_bytes)``.
*total_bytes* is ``-1`` when the server omits Content-Length.
"""
url = f"{self.base_url}/items/{part_number}/file/{revision}"
req = urllib.request.Request(url, headers=self._auth_headers(), method="GET")
try:
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
total = int(resp.headers.get("Content-Length", -1))
downloaded = 0
with open(dest_path, "wb") as f:
while True:
chunk = resp.read(8192)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if progress_callback is not None:
progress_callback(downloaded, total)
return True
except urllib.error.HTTPError as e:
if e.code == 404:
return False
raise RuntimeError(f"Download error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _upload_file(
self, part_number: str, file_path: str, properties: Dict, comment: str = ""
) -> Dict[str, Any]:
"""Upload a file and create a new revision."""
import mimetypes
url = f"{self.base_url}/items/{part_number}/file"
with open(file_path, "rb") as f:
file_data = f.read()
boundary = "----SiloUploadBoundary" + str(hash(file_path))[-8:]
body_parts: list = []
filename = os.path.basename(file_path)
content_type = mimetypes.guess_type(filename)[0] or "application/octet-stream"
body_parts.append(
f'--{boundary}\r\nContent-Disposition: form-data; name="file"; '
f'filename="{filename}"\r\nContent-Type: {content_type}\r\n\r\n'
)
body_parts.append(file_data)
body_parts.append(b"\r\n")
if comment:
body_parts.append(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="comment"\r\n\r\n'
f"{comment}\r\n"
)
if properties:
props_json = json.dumps(properties, allow_nan=False, default=str)
body_parts.append(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="properties"\r\n\r\n'
f"{props_json}\r\n"
)
body_parts.append(f"--{boundary}--\r\n")
body = b""
for part in body_parts:
body += part.encode("utf-8") if isinstance(part, str) else part
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Content-Length": str(len(body)),
}
headers.update(self._auth_headers())
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _upload_ods(
self, path: str, ods_bytes: bytes, filename: str = "upload.ods"
) -> Any:
"""POST an ODS file as multipart/form-data."""
boundary = "----SiloCalcUpload" + str(abs(hash(filename)))[-8:]
parts: list = []
parts.append(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
f"Content-Type: application/vnd.oasis.opendocument.spreadsheet\r\n\r\n"
)
parts.append(ods_bytes)
parts.append(f"\r\n--{boundary}--\r\n")
body = b""
for p in parts:
body += p.encode("utf-8") if isinstance(p, str) else p
url = f"{self.base_url}{path}"
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Content-Length": str(len(body)),
}
headers.update(self._auth_headers())
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, context=self._ssl_context()) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
# -- Authentication -----------------------------------------------------
def login(
self, username: str, password: str, token_name: str = ""
) -> Dict[str, Any]:
"""Session login and create a persistent API token.
Performs: ``POST /login`` (session) -> ``GET /api/auth/me`` (verify)
-> ``POST /api/auth/tokens`` (create token). The token is persisted
via ``settings.save_auth()``.
Args:
token_name: Human-readable label for the token. Defaults to
the local hostname.
"""
ctx = self._ssl_context()
cookie_jar = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(cookie_jar),
urllib.request.HTTPSHandler(context=ctx),
)
# Step 1: POST credentials to /login
login_url = f"{self._origin}/login"
form_data = urllib.parse.urlencode(
{"username": username, "password": password}
).encode()
req = urllib.request.Request(
login_url,
data=form_data,
method="POST",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
try:
opener.open(req)
except urllib.error.HTTPError as e:
if e.code not in (302, 303):
raise RuntimeError(f"Login failed (HTTP {e.code})")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
# Step 2: Verify session via /api/auth/me
me_req = urllib.request.Request(f"{self._origin}/api/auth/me", method="GET")
try:
with opener.open(me_req) as resp:
user_info = json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
if e.code == 401:
raise RuntimeError("Login failed: invalid username or password")
raise RuntimeError(f"Login verification failed (HTTP {e.code})")
# Step 3: Create a persistent API token
if not token_name:
token_name = socket.gethostname()
token_body = json.dumps({"name": token_name, "expires_in_days": 90}).encode()
token_req = urllib.request.Request(
f"{self._origin}/api/auth/tokens",
data=token_body,
method="POST",
headers={"Content-Type": "application/json"},
)
try:
with opener.open(token_req) as resp:
token_result = json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"Failed to create API token (HTTP {e.code})")
raw_token = token_result.get("token", "")
if not raw_token:
raise RuntimeError("Server did not return an API token")
self._settings.save_auth(
username=user_info.get("username", username),
role=user_info.get("role", ""),
source=user_info.get("auth_source", ""),
token=raw_token,
)
return {
"username": user_info.get("username", username),
"role": user_info.get("role", ""),
"auth_source": user_info.get("auth_source", ""),
"token_name": token_result.get("name", ""),
"token_prefix": token_result.get("token_prefix", ""),
}
def logout(self):
"""Clear stored API token and authentication info."""
self._settings.clear_auth()
def is_authenticated(self) -> bool:
"""Return True if a valid API token is configured."""
return bool(self._settings.get_api_token() or os.environ.get("SILO_API_TOKEN"))
def get_current_user(self) -> Optional[Dict[str, Any]]:
"""Fetch the current user info from the server."""
try:
return self._request("GET", "/auth/me")
except RuntimeError:
return None
def list_tokens(self) -> List[Dict[str, Any]]:
"""List API tokens for the current user."""
return self._request("GET", "/auth/tokens")
def create_token(
self, name: str, expires_in_days: Optional[int] = None
) -> Dict[str, Any]:
"""Create a new API token."""
data: Dict[str, Any] = {"name": name}
if expires_in_days is not None:
data["expires_in_days"] = expires_in_days
return self._request("POST", "/auth/tokens", data)
def revoke_token(self, token_id: str) -> None:
"""Revoke an API token by its ID."""
url = f"{self.base_url}/auth/tokens/{token_id}"
headers = {"Content-Type": "application/json"}
headers.update(self._auth_headers())
req = urllib.request.Request(url, headers=headers, method="DELETE")
try:
urllib.request.urlopen(req, context=self._ssl_context())
except urllib.error.HTTPError as e:
raise RuntimeError(f"API error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def check_connection(self) -> Tuple[bool, str]:
"""Check connectivity to the Silo server.
Returns ``(reachable, message)``.
"""
url = f"{self._origin}/health"
req = urllib.request.Request(url, method="GET")
try:
with urllib.request.urlopen(
req, context=self._ssl_context(), timeout=5
) as resp:
return True, f"OK ({resp.status})"
except urllib.error.HTTPError as e:
return True, f"Server error ({e.code})"
except urllib.error.URLError as e:
return False, str(e.reason)
except Exception as e:
return False, str(e)
# -- Items --------------------------------------------------------------
def get_item(self, part_number: str) -> Dict[str, Any]:
return self._request(
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}"
)
def list_items(
self,
search: str = "",
item_type: str = "",
project: str = "",
limit: int = 100,
) -> list:
params = [f"limit={limit}"]
if search:
params.append(f"search={urllib.parse.quote(search)}")
if item_type:
params.append(f"type={item_type}")
if project:
params.append(f"project={urllib.parse.quote(project)}")
return self._request("GET", "/items?" + "&".join(params))
def create_item(
self,
schema: str,
category: str,
description: str = "",
projects: Optional[List[str]] = None,
sourcing_type: str = "",
sourcing_link: str = "",
standard_cost: Optional[float] = None,
long_description: str = "",
) -> Dict[str, Any]:
data: Dict[str, Any] = {
"schema": schema,
"category": category,
"description": description,
}
if projects:
data["projects"] = projects
if sourcing_type:
data["sourcing_type"] = sourcing_type
if sourcing_link:
data["sourcing_link"] = sourcing_link
if standard_cost is not None:
data["standard_cost"] = standard_cost
if long_description:
data["long_description"] = long_description
return self._request("POST", "/items", data)
def update_item(self, part_number: str, **fields) -> Dict[str, Any]:
return self._request(
"PUT",
f"/items/{urllib.parse.quote(part_number, safe='')}",
fields,
)
# -- Revisions ----------------------------------------------------------
def get_revisions(self, part_number: str) -> list:
return self._request(
"GET",
f"/items/{urllib.parse.quote(part_number, safe='')}/revisions",
)
def get_revision(self, part_number: str, revision: int) -> Dict[str, Any]:
return self._request(
"GET",
f"/items/{urllib.parse.quote(part_number, safe='')}/revisions/{revision}",
)
def compare_revisions(
self, part_number: str, from_rev: int, to_rev: int
) -> Dict[str, Any]:
pn = urllib.parse.quote(part_number, safe="")
return self._request(
"GET",
f"/items/{pn}/revisions/compare?from={from_rev}&to={to_rev}",
)
def rollback_revision(
self, part_number: str, revision: int, comment: str = ""
) -> Dict[str, Any]:
data: Dict[str, Any] = {}
if comment:
data["comment"] = comment
pn = urllib.parse.quote(part_number, safe="")
return self._request("POST", f"/items/{pn}/revisions/{revision}/rollback", data)
def update_revision(
self,
part_number: str,
revision: int,
status: str = None,
labels: list = None,
) -> Dict[str, Any]:
data: Dict[str, Any] = {}
if status:
data["status"] = status
if labels is not None:
data["labels"] = labels
pn = urllib.parse.quote(part_number, safe="")
return self._request("PATCH", f"/items/{pn}/revisions/{revision}", data)
def has_file(self, part_number: str) -> Tuple[bool, Optional[int]]:
"""Check if item has files in storage."""
try:
revisions = self.get_revisions(part_number)
for rev in revisions:
if rev.get("file_key"):
return True, rev["revision_number"]
return False, None
except Exception:
return False, None
def latest_file_revision(self, part_number: str) -> Optional[Dict]:
"""Return the most recent revision with a file, or ``None``."""
try:
revisions = self.get_revisions(part_number)
for rev in revisions:
if rev.get("file_key"):
return rev
return None
except Exception:
return None
# -- BOM / Relationships ------------------------------------------------
def get_bom(self, part_number: str) -> list:
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/bom")
def get_bom_expanded(self, part_number: str, depth: int = 10) -> list:
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/bom/expanded?depth={depth}")
def get_bom_where_used(self, part_number: str) -> list:
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/bom/where-used")
def add_bom_entry(
self,
parent_pn: str,
child_pn: str,
quantity: Optional[float] = None,
unit: str = None,
rel_type: str = "component",
ref_des: list = None,
metadata: Optional[Dict] = None,
) -> Dict[str, Any]:
data: Dict[str, Any] = {
"child_part_number": child_pn,
"rel_type": rel_type,
}
if quantity is not None:
data["quantity"] = quantity
if unit:
data["unit"] = unit
if ref_des:
data["reference_designators"] = ref_des
if metadata:
data["metadata"] = metadata
pn = urllib.parse.quote(parent_pn, safe="")
return self._request("POST", f"/items/{pn}/bom", data)
def update_bom_entry(
self,
parent_pn: str,
child_pn: str,
quantity: Optional[float] = None,
unit: str = None,
rel_type: str = None,
ref_des: list = None,
metadata: Optional[Dict] = None,
) -> Dict[str, Any]:
data: Dict[str, Any] = {}
if quantity is not None:
data["quantity"] = quantity
if unit is not None:
data["unit"] = unit
if rel_type is not None:
data["rel_type"] = rel_type
if ref_des is not None:
data["reference_designators"] = ref_des
if metadata:
data["metadata"] = metadata
ppn = urllib.parse.quote(parent_pn, safe="")
cpn = urllib.parse.quote(child_pn, safe="")
return self._request("PUT", f"/items/{ppn}/bom/{cpn}", data)
def delete_bom_entry(self, parent_pn: str, child_pn: str) -> None:
ppn = urllib.parse.quote(parent_pn, safe="")
cpn = urllib.parse.quote(child_pn, safe="")
url = f"{self.base_url}/items/{ppn}/bom/{cpn}"
headers = {"Content-Type": "application/json"}
headers.update(self._auth_headers())
req = urllib.request.Request(url, headers=headers, method="DELETE")
try:
urllib.request.urlopen(req, context=self._ssl_context())
except urllib.error.HTTPError as e:
raise RuntimeError(f"API error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
# -- Schemas ------------------------------------------------------------
def get_schema(self, name: str = "kindred-rd") -> Dict[str, Any]:
return self._request("GET", f"/schemas/{urllib.parse.quote(name, safe='')}")
def get_property_schema(self, name: str = "kindred-rd") -> Dict[str, Any]:
return self._request(
"GET",
f"/schemas/{urllib.parse.quote(name, safe='')}/properties",
)
# -- Projects -----------------------------------------------------------
def get_projects(self) -> list:
return self._request("GET", "/projects")
def get_project_items(self, code: str) -> list:
return self._request(
"GET", f"/projects/{urllib.parse.quote(code, safe='')}/items"
)
def get_item_projects(self, part_number: str) -> list:
pn = urllib.parse.quote(part_number, safe="")
return self._request("GET", f"/items/{pn}/projects")
def add_item_projects(
self, part_number: str, project_codes: List[str]
) -> Dict[str, Any]:
pn = urllib.parse.quote(part_number, safe="")
return self._request(
"POST", f"/items/{pn}/projects", {"projects": project_codes}
)
# -- Part number generation ---------------------------------------------
def generate_part_number(self, schema: str, category: str) -> Dict[str, Any]:
return self._request(
"POST",
"/generate-part-number",
{"schema": schema, "category": category},
)
# -- ODS endpoints ------------------------------------------------------
def download_bom_ods(self, part_number: str) -> bytes:
pn = urllib.parse.quote(part_number, safe="")
return self._download(f"/items/{pn}/bom/export.ods")
def download_project_sheet(self, project_code: str) -> bytes:
code = urllib.parse.quote(project_code, safe="")
return self._download(f"/projects/{code}/sheet.ods")
def upload_sheet_diff(
self, ods_bytes: bytes, filename: str = "sheet.ods"
) -> Dict[str, Any]:
return self._upload_ods("/sheets/diff", ods_bytes, filename)

Binary file not shown.

Binary file not shown.

44
silo_client/_ssl.py Normal file
View File

@@ -0,0 +1,44 @@
"""SSL context builder for Silo API clients."""
import os
import ssl
def build_ssl_context(verify: bool = True, cert_path: str = "") -> ssl.SSLContext:
"""Build an SSL context honouring the caller's verify/cert preferences.
Args:
verify: Whether to verify server certificates.
cert_path: Optional path to a custom CA certificate file.
Returns:
A configured ``ssl.SSLContext``.
"""
ctx = ssl.create_default_context()
if not verify:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
if cert_path and os.path.isfile(cert_path):
try:
ctx.load_verify_locations(cert_path)
except Exception:
pass
# The bundled Python may not find the system CA store automatically
# (its compiled-in path points to the build environment). Load the
# system CA bundle explicitly so internal CAs (e.g. FreeIPA) are trusted.
for ca_path in (
"/etc/ssl/certs/ca-certificates.crt", # Debian / Ubuntu
"/etc/pki/tls/certs/ca-bundle.crt", # RHEL / CentOS
):
if os.path.isfile(ca_path):
try:
ctx.load_verify_locations(ca_path)
except Exception:
pass
break
return ctx