diff --git a/docs/CALC_EXTENSION.md b/docs/CALC_EXTENSION.md index 2a48c62..cadd706 100644 --- a/docs/CALC_EXTENSION.md +++ b/docs/CALC_EXTENSION.md @@ -1,191 +1,10 @@ # LibreOffice Calc Extension -**Last Updated:** 2026-02-01 - ---- - -## Overview - -The Silo Calc extension (`silo-calc.oxt`) is a LibreOffice Calc add-on that -connects project BOM spreadsheets directly to the Silo parts database. -Engineers work in their familiar spreadsheet environment while Silo handles -part number generation, revision tracking, and data synchronization. - -The extension is a Python UNO component packaged as an `.oxt` file. It uses -only stdlib (`urllib`, `json`, `ssl`) -- no pip dependencies. The same -`SiloClient` pattern and auth flow from the FreeCAD workbench is reused. - ---- - -## Architecture - -``` -Engineer's workstation Silo server (silod) -+--------------------------+ +------------------------+ -| LibreOffice Calc | | Go API server | -| +----------------------+ | REST | +--------------------+ | -| | Silo Extension (.oxt)| <--------> | | ODS endpoints | | -| | - Pull/Push BOM | | API | | (internal/ods) | | -| | - Completion Wizard | | | +--------------------+ | -| | - AI Describe | | | | | -| +----------------------+ | | +--------------------+ | -| UNO API | cells | | | PostgreSQL | | -| +----------------------+ | | +--------------------+ | -| | Project Workbook | | +------------------------+ -| | ~/projects/sheets/ | | -| | 3DX10/3DX10.ods | | -+--------------------------+ - -Extension also calls OpenRouter AI API directly for -description generation (does not go through silod). -``` - ---- - -## Extension Structure - -``` -pkg/calc/ - META-INF/manifest.xml Extension manifest - description.xml Extension metadata (id, version, publisher) - description/description_en.txt English description - Addons.xcu Toolbar + menu registration - ProtocolHandler.xcu Dispatch protocol registration - silo_calc_component.py UNO DispatchProvider entry point - pythonpath/silo_calc/ - __init__.py - ai_client.py OpenRouter API client - client.py SiloClient (HTTP, auth, SSL) - completion_wizard.py 3-step new item wizard - dialogs.py UNO dialog toolkit wrappers - project_files.py Local project file management - pull.py Sheet population from server - push.py Sheet changes back to server - settings.py JSON settings (~/.config/silo/calc-settings.json) - sheet_format.py Column layout constants - sync_engine.py Row hashing, classification, diff - tests/ - test_basics.py 31 unit tests (no UNO/network required) -``` - ---- - -## Toolbar Commands - -| Button | Command | Description | -|--------|---------|-------------| -| Login | `SiloLogin` | Username/password dialog, creates API token | -| Pull BOM | `SiloPullBOM` | Assembly picker -> expanded BOM -> populates sheet | -| Pull Project | `SiloPullProject` | Project picker -> all project items -> multi-sheet workbook | -| Push | `SiloPush` | Classifies rows -> creates/updates items -> auto-tags project | -| Add Item | `SiloAddItem` | Completion wizard (category -> description -> fields) | -| Refresh | `SiloRefresh` | Re-pull (placeholder) | -| Settings | `SiloSettings` | API URL, token, SSL, OpenRouter config | -| AI Describe | `SiloAIDescription` | AI description from seller description | - ---- - -## BOM Sheet Format - -28 columns total: 11 visible core, 13 hidden properties, 4 hidden sync tracking. - -### Visible Columns - -| Col | Header | Notes | -|-----|--------|-------| -| A | Item | Assembly/section header | -| B | Level | BOM depth (0=top) | -| C | Source | M=manufactured, P=purchased | -| D | PN | Part number (read-only for existing) | -| E | Description | Required for new items | -| F | Seller Description | Vendor catalog text | -| G | Unit Cost | Currency | -| H | QTY | Decimal quantity | -| I | Ext Cost | Formula =G*H (not stored) | -| J | Sourcing Link | URL | -| K | Schema | Schema name | - -### Hidden Property Columns (L-X) - -Manufacturer, Manufacturer PN, Supplier, Supplier PN, Lead Time, Min Order -Qty, Lifecycle Status, RoHS, Country of Origin, Material, Finish, Notes, -Long Description. Populated from revision properties, collapsed by default. - -### Hidden Sync Columns (Y-AB) - -`_silo_row_hash` (SHA-256), `_silo_row_status`, `_silo_updated_at`, -`_silo_parent_pn`. Used for change detection and conflict resolution. - -### Row Status Colors - -| Status | Color | Hex | -|--------|-------|-----| -| synced | light green | #C6EFCE | -| modified | light yellow | #FFEB9C | -| new | light blue | #BDD7EE | -| error | light red | #FFC7CE | -| conflict | orange | #F4B084 | - ---- - -## Completion Wizard - -Three-step guided workflow for adding new BOM rows: - -1. **Category** -- select from schema categories (F01-X08) -2. **Description** -- required text, with AI generation offer when blank -3. **Common fields** -- sourcing type, unit cost, quantity, sourcing link - -If a manually entered PN already exists in the database, the PN Conflict -Resolution dialog offers: use existing item, auto-generate new PN, or cancel. - -New items are automatically tagged with the workbook's project code. - ---- - -## OpenRouter AI Integration - -The extension calls the OpenRouter API (OpenAI-compatible) to generate -standardized part descriptions from verbose seller descriptions. This is -useful because seller descriptions are typically detailed catalog text while -BOM descriptions need to be concise (max 60 chars, title case, component -type first, standard abbreviations). - -### Configuration - -Settings dialog fields (or `OPENROUTER_API_KEY` env var): - -- **API Key** -- OpenRouter bearer token (masked in UI) -- **AI Model** -- default `openai/gpt-4.1-nano` -- **AI Instructions** -- customizable system prompt - -### Workflow - -1. Paste seller description into column F -2. Click "AI Describe" on toolbar -3. Review side-by-side dialog (seller text left, AI result right) -4. Edit if needed, click Accept -5. Description written to column E - -The AI client (`ai_client.py`) is designed for reuse. The generic -`chat_completion()` function can be called by future features (price -analysis, sourcing assistance) without modification. - ---- +The Silo Calc extension has been moved to its own repository: [silo-calc](https://git.kindred-systems.com/kindred/silo-calc). ## Server-Side ODS Support -Pure Go ODS library at `internal/ods/` for server-side spreadsheet generation. -No headless LibreOffice dependency -- ODS is a ZIP of XML files. - -### Library (`internal/ods/`) - -- `ods.go` -- types: Workbook, Sheet, Column, Row, Cell, CellType -- `writer.go` -- generates valid ODS ZIP archives -- `reader.go` -- parses ODS back to Go structs -- `ods_test.go` -- 10 round-trip tests - -### ODS Endpoints +The server-side ODS library (`internal/ods/`) and ODS endpoints remain in this repository. See `docs/SPECIFICATION.md` Section 11 for the full endpoint listing. | Method | Path | Description | |--------|------|-------------| @@ -195,61 +14,3 @@ No headless LibreOffice dependency -- ODS is a ZIP of XML files. | GET | `/api/items/{pn}/bom/export.ods` | BOM as formatted ODS | | GET | `/api/projects/{code}/sheet.ods` | Multi-sheet project workbook | | POST | `/api/sheets/diff` | Upload ODS, return JSON diff | - ---- - -## Build and Install - -```makefile -make build-calc-oxt # zip pkg/calc/ into silo-calc.oxt -make install-calc # unopkg add silo-calc.oxt -make uninstall-calc # unopkg remove io.kindredsystems.silo.calc -make test-calc # python3 -m unittest (31 tests) -``` - ---- - -## Implementation Status - -| Component | Status | Notes | -|-----------|--------|-------| -| Extension skeleton | Done | manifest, description, Addons.xcu, ProtocolHandler.xcu | -| SiloClient | Done | HTTP client adapted from FreeCAD workbench | -| Settings | Done | JSON persistence, env var fallbacks | -| Login dialog | Done | Two-step username/password | -| Settings dialog | Done | API URL, token, SSL, OpenRouter fields | -| Pull BOM | Done | Full column set, hidden groups, hash tracking | -| Pull Project | Done | Items sheet + BOM sheet | -| Push | Done | Create/update, auto project tagging, conflict detection | -| Completion wizard | Done | 3-step with PN conflict resolution | -| AI description | Done | OpenRouter client, review dialog, toolbar button | -| Refresh | Stub | Placeholder only | -| Go ODS library | Done | Writer, reader, 10 round-trip tests | -| ODS endpoints | Done | 6 handlers registered | -| Makefile targets | Done | build, install, uninstall, test, clean | - -### Known Issues - -- Refresh command is a placeholder (shows "coming soon") -- No integration tests with a running Silo instance yet -- `completion_wizard.py` uses simple input boxes instead of proper list dialogs -- Push does not yet handle BOM relationship creation (item fields only) - ---- - -## Testing - -31 unit tests in `pkg/calc/tests/test_basics.py`, runnable without UNO or -network access: - -- TestSheetFormat (7) -- column indices, headers, sheet type detection -- TestSyncEngine (9) -- hashing, classification, diff, conflict detection -- TestSettings (3) -- load/save/auth -- TestProjectFiles (3) -- path resolution, read/write -- TestAIClient (9) -- constants, configuration, error handling - -``` -$ python3 -m unittest pkg/calc/tests/test_basics.py -v -Ran 31 tests in 0.031s -OK -``` diff --git a/pkg/calc/Addons.xcu b/pkg/calc/Addons.xcu deleted file mode 100644 index 4d07ca3..0000000 --- a/pkg/calc/Addons.xcu +++ /dev/null @@ -1,235 +0,0 @@ - - - - - - - - - - - io.kindredsystems.silo.calc:SiloLogin - - - Login - - - _self - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloPullBOM - - - Pull BOM - - - _self - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloPullProject - - - Pull Project - - - _self - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloPush - - - Push - - - _self - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloAddItem - - - Add Item - - - _self - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloRefresh - - - Refresh - - - _self - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloSettings - - - Settings - - - _self - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloAIDescription - - - AI Describe - - - _self - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - - - - - - io.kindredsystems.silo.calc:SiloLogin - - - Silo: Login - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloPullBOM - - - Silo: Pull BOM - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloPullProject - - - Silo: Pull Project Items - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloPush - - - Silo: Push Changes - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloAddItem - - - Silo: Add Item - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloRefresh - - - Silo: Refresh - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloSettings - - - Silo: Settings - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - io.kindredsystems.silo.calc:SiloAIDescription - - - Silo: AI Describe - - - com.sun.star.sheet.SpreadsheetDocument - - - - - - diff --git a/pkg/calc/META-INF/manifest.xml b/pkg/calc/META-INF/manifest.xml deleted file mode 100644 index b5a62eb..0000000 --- a/pkg/calc/META-INF/manifest.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - - - - diff --git a/pkg/calc/ProtocolHandler.xcu b/pkg/calc/ProtocolHandler.xcu deleted file mode 100644 index e50f5eb..0000000 --- a/pkg/calc/ProtocolHandler.xcu +++ /dev/null @@ -1,14 +0,0 @@ - - - - - - io.kindredsystems.silo.calc:* - - - - diff --git a/pkg/calc/description.xml b/pkg/calc/description.xml deleted file mode 100644 index 6af2111..0000000 --- a/pkg/calc/description.xml +++ /dev/null @@ -1,27 +0,0 @@ - - - - - - - - Silo - Spreadsheet Sync - - - - Kindred Systems - - - - - - - - - - - - - diff --git a/pkg/calc/description/description_en.txt b/pkg/calc/description/description_en.txt deleted file mode 100644 index 5aeb431..0000000 --- a/pkg/calc/description/description_en.txt +++ /dev/null @@ -1,15 +0,0 @@ -Silo Spreadsheet Sync for LibreOffice Calc - -Bidirectional sync between LibreOffice Calc spreadsheets and the Silo -parts database. Pull project BOMs, edit in Calc, push changes back. - -Features: -- Pull BOM: fetch an expanded bill of materials as a formatted sheet -- Pull Project: fetch all items tagged with a project code -- Push: sync local edits (new items, modified fields) back to the database -- Add Item wizard: guided workflow for adding new BOM entries -- PN conflict resolution: handle duplicate part numbers gracefully -- Auto project tagging: items in a working BOM are tagged with the project - -Toolbar commands appear when a Calc spreadsheet is active. -Settings and API token are stored in ~/.config/silo/calc-settings.json. diff --git a/pkg/calc/pythonpath/silo_calc/__init__.py b/pkg/calc/pythonpath/silo_calc/__init__.py deleted file mode 100644 index 92e851c..0000000 --- a/pkg/calc/pythonpath/silo_calc/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -"""Silo LibreOffice Calc extension -- spreadsheet sync for project data.""" - -__version__ = "0.1.0" diff --git a/pkg/calc/pythonpath/silo_calc/ai_client.py b/pkg/calc/pythonpath/silo_calc/ai_client.py deleted file mode 100644 index 5d81aa9..0000000 --- a/pkg/calc/pythonpath/silo_calc/ai_client.py +++ /dev/null @@ -1,217 +0,0 @@ -"""OpenRouter AI client for the Silo Calc extension. - -Provides AI-powered text generation via the OpenRouter API -(https://openrouter.ai/api/v1/chat/completions). Uses stdlib urllib -only -- no external dependencies. - -The core ``chat_completion()`` function is generic and reusable for -future features (price analysis, sourcing assistance). Domain helpers -like ``generate_description()`` build on top of it. -""" - -import json -import os -import ssl -import urllib.error -import urllib.request -from typing import Any, Dict, List, Optional - -from . import settings as _settings - -# --------------------------------------------------------------------------- -# Constants -# --------------------------------------------------------------------------- - -OPENROUTER_API_URL = "https://openrouter.ai/api/v1/chat/completions" - -DEFAULT_MODEL = "openai/gpt-4.1-nano" - -DEFAULT_INSTRUCTIONS = ( - "You are a parts librarian for an engineering company. " - "Given a seller's product description, produce a concise, standardized " - "part description suitable for a Bill of Materials. Rules:\n" - "- Maximum 60 characters\n" - "- Use title case\n" - "- Start with the component type (e.g., Bolt, Resistor, Bearing)\n" - "- Include key specifications (size, rating, material) in order of importance\n" - "- Omit brand names, marketing language, and redundant words\n" - "- Use standard engineering abbreviations (SS, Al, M3, 1/4-20)\n" - "- Output ONLY the description, no quotes or explanation" -) - -# --------------------------------------------------------------------------- -# SSL helper (same pattern as client.py) -# --------------------------------------------------------------------------- - - -def _get_ssl_context() -> ssl.SSLContext: - """Build an SSL context for OpenRouter API calls.""" - ctx = ssl.create_default_context() - for ca_path in ( - "/etc/ssl/certs/ca-certificates.crt", - "/etc/pki/tls/certs/ca-bundle.crt", - ): - if os.path.isfile(ca_path): - try: - ctx.load_verify_locations(ca_path) - except Exception: - pass - break - return ctx - - -# --------------------------------------------------------------------------- -# Settings resolution helpers -# --------------------------------------------------------------------------- - - -def _get_api_key() -> str: - """Resolve the OpenRouter API key from settings or environment.""" - cfg = _settings.load() - key = cfg.get("openrouter_api_key", "") - if not key: - key = os.environ.get("OPENROUTER_API_KEY", "") - return key - - -def _get_model() -> str: - """Resolve the model slug from settings or default.""" - cfg = _settings.load() - return cfg.get("openrouter_model", "") or DEFAULT_MODEL - - -def _get_instructions() -> str: - """Resolve the system instructions from settings or default.""" - cfg = _settings.load() - return cfg.get("openrouter_instructions", "") or DEFAULT_INSTRUCTIONS - - -# --------------------------------------------------------------------------- -# Core API function -# --------------------------------------------------------------------------- - - -def chat_completion( - messages: List[Dict[str, str]], - model: Optional[str] = None, - temperature: float = 0.3, - max_tokens: int = 200, -) -> str: - """Send a chat completion request to OpenRouter. - - Parameters - ---------- - messages : list of {"role": str, "content": str} - model : model slug (default: from settings or DEFAULT_MODEL) - temperature : sampling temperature - max_tokens : maximum response tokens - - Returns - ------- - str : the assistant's response text - - Raises - ------ - RuntimeError : on missing API key, HTTP errors, network errors, - or unexpected response format. - """ - api_key = _get_api_key() - if not api_key: - raise RuntimeError( - "OpenRouter API key not configured. " - "Set it in Settings or the OPENROUTER_API_KEY environment variable." - ) - - model = model or _get_model() - - payload = { - "model": model, - "messages": messages, - "temperature": temperature, - "max_tokens": max_tokens, - } - - headers = { - "Authorization": f"Bearer {api_key}", - "Content-Type": "application/json", - "HTTP-Referer": "https://github.com/kindredsystems/silo", - "X-Title": "Silo Calc Extension", - } - - body = json.dumps(payload).encode("utf-8") - req = urllib.request.Request( - OPENROUTER_API_URL, data=body, headers=headers, method="POST" - ) - - try: - with urllib.request.urlopen( - req, context=_get_ssl_context(), timeout=30 - ) as resp: - result = json.loads(resp.read().decode("utf-8")) - except urllib.error.HTTPError as e: - error_body = e.read().decode("utf-8", errors="replace") - if e.code == 401: - raise RuntimeError("OpenRouter API key is invalid or expired.") - if e.code == 402: - raise RuntimeError("OpenRouter account has insufficient credits.") - if e.code == 429: - raise RuntimeError("OpenRouter rate limit exceeded. Try again shortly.") - raise RuntimeError(f"OpenRouter API error {e.code}: {error_body}") - except urllib.error.URLError as e: - raise RuntimeError(f"Network error contacting OpenRouter: {e.reason}") - - choices = result.get("choices", []) - if not choices: - raise RuntimeError("OpenRouter returned an empty response.") - - return choices[0].get("message", {}).get("content", "").strip() - - -# --------------------------------------------------------------------------- -# Domain helpers -# --------------------------------------------------------------------------- - - -def generate_description( - seller_description: str, - category: str = "", - existing_description: str = "", - part_number: str = "", -) -> str: - """Generate a standardized part description from a seller description. - - Parameters - ---------- - seller_description : the raw seller/vendor description text - category : category code (e.g. "F01") for context - existing_description : current description in col E, if any - part_number : the part number, for context - - Returns - ------- - str : the AI-generated standardized description - """ - system_prompt = _get_instructions() - - user_parts = [] - if category: - user_parts.append(f"Category: {category}") - if part_number: - user_parts.append(f"Part Number: {part_number}") - if existing_description: - user_parts.append(f"Current Description: {existing_description}") - user_parts.append(f"Seller Description: {seller_description}") - - user_prompt = "\n".join(user_parts) - - messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ] - - return chat_completion(messages) - - -def is_configured() -> bool: - """Return True if the OpenRouter API key is available.""" - return bool(_get_api_key()) diff --git a/pkg/calc/pythonpath/silo_calc/client.py b/pkg/calc/pythonpath/silo_calc/client.py deleted file mode 100644 index 9797dbb..0000000 --- a/pkg/calc/pythonpath/silo_calc/client.py +++ /dev/null @@ -1,447 +0,0 @@ -"""Silo API client for LibreOffice Calc extension. - -Adapted from pkg/freecad/silo_commands.py SiloClient. Uses urllib (no -external dependencies) and the same auth flow: session login to obtain a -persistent API token stored in a local JSON settings file. -""" - -import http.cookiejar -import json -import os -import socket -import ssl -import urllib.error -import urllib.parse -import urllib.request -from typing import Any, Dict, List, Optional, Tuple - -from . import settings as _settings - -# --------------------------------------------------------------------------- -# SSL helpers -# --------------------------------------------------------------------------- - - -def _get_ssl_context() -> ssl.SSLContext: - """Build an SSL context honouring the user's verify/cert preferences.""" - cfg = _settings.load() - if not cfg.get("ssl_verify", True): - ctx = ssl.create_default_context() - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - return ctx - - ctx = ssl.create_default_context() - custom_cert = cfg.get("ssl_cert_path", "") - if custom_cert and os.path.isfile(custom_cert): - try: - ctx.load_verify_locations(custom_cert) - except Exception: - pass - # Load system CA bundles (bundled Python may not find them automatically) - for ca_path in ( - "/etc/ssl/certs/ca-certificates.crt", - "/etc/pki/tls/certs/ca-bundle.crt", - ): - if os.path.isfile(ca_path): - try: - ctx.load_verify_locations(ca_path) - except Exception: - pass - break - return ctx - - -# --------------------------------------------------------------------------- -# SiloClient -# --------------------------------------------------------------------------- - - -class SiloClient: - """HTTP client for the Silo REST API.""" - - def __init__(self, base_url: str = None): - self._explicit_url = base_url - - # -- URL helpers -------------------------------------------------------- - - @property - def base_url(self) -> str: - if self._explicit_url: - return self._explicit_url.rstrip("/") - cfg = _settings.load() - url = cfg.get("api_url", "").rstrip("/") - if not url: - url = os.environ.get("SILO_API_URL", "http://localhost:8080/api") - # Auto-append /api for bare origins - parsed = urllib.parse.urlparse(url) - if not parsed.path or parsed.path == "/": - url = url + "/api" - return url - - @property - def _origin(self) -> str: - """Server origin (without /api) for auth endpoints.""" - base = self.base_url - return base.rsplit("/api", 1)[0] if base.endswith("/api") else base - - # -- Auth headers ------------------------------------------------------- - - def _auth_headers(self) -> Dict[str, str]: - token = _settings.load().get("api_token", "") or os.environ.get( - "SILO_API_TOKEN", "" - ) - if token: - return {"Authorization": f"Bearer {token}"} - return {} - - # -- Core HTTP ---------------------------------------------------------- - - def _request( - self, - method: str, - path: str, - data: Optional[Dict] = None, - raw: bool = False, - ) -> Any: - """Make an authenticated JSON request. Returns parsed JSON. - - If *raw* is True the response bytes are returned instead. - """ - url = f"{self.base_url}{path}" - headers = {"Content-Type": "application/json"} - headers.update(self._auth_headers()) - body = json.dumps(data).encode() if data else None - req = urllib.request.Request(url, data=body, headers=headers, method=method) - try: - with urllib.request.urlopen(req, context=_get_ssl_context()) as resp: - payload = resp.read() - if raw: - return payload - return json.loads(payload.decode()) - except urllib.error.HTTPError as e: - if e.code == 401: - _settings.clear_auth() - error_body = e.read().decode() - raise RuntimeError(f"API error {e.code}: {error_body}") - except urllib.error.URLError as e: - raise RuntimeError(f"Connection error: {e.reason}") - - def _download(self, path: str) -> bytes: - """Download raw bytes from an API path.""" - url = f"{self.base_url}{path}" - req = urllib.request.Request(url, headers=self._auth_headers(), method="GET") - try: - with urllib.request.urlopen(req, context=_get_ssl_context()) as resp: - return resp.read() - except urllib.error.HTTPError as e: - raise RuntimeError(f"Download error {e.code}: {e.read().decode()}") - except urllib.error.URLError as e: - raise RuntimeError(f"Connection error: {e.reason}") - - def _upload_ods( - self, path: str, ods_bytes: bytes, filename: str = "upload.ods" - ) -> Any: - """POST an ODS file as multipart/form-data.""" - boundary = "----SiloCalcUpload" + str(abs(hash(filename)))[-8:] - parts = [] - parts.append( - f"--{boundary}\r\n" - f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n' - f"Content-Type: application/vnd.oasis.opendocument.spreadsheet\r\n\r\n" - ) - parts.append(ods_bytes) - parts.append(f"\r\n--{boundary}--\r\n") - - body = b"" - for p in parts: - body += p.encode("utf-8") if isinstance(p, str) else p - - url = f"{self.base_url}{path}" - headers = { - "Content-Type": f"multipart/form-data; boundary={boundary}", - "Content-Length": str(len(body)), - } - headers.update(self._auth_headers()) - req = urllib.request.Request(url, data=body, headers=headers, method="POST") - try: - with urllib.request.urlopen(req, context=_get_ssl_context()) as resp: - return json.loads(resp.read().decode()) - except urllib.error.HTTPError as e: - raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}") - except urllib.error.URLError as e: - raise RuntimeError(f"Connection error: {e.reason}") - - # -- Authentication ----------------------------------------------------- - - def login(self, username: str, password: str) -> Dict[str, Any]: - """Session login + create persistent API token (same flow as FreeCAD).""" - ctx = _get_ssl_context() - cookie_jar = http.cookiejar.CookieJar() - opener = urllib.request.build_opener( - urllib.request.HTTPCookieProcessor(cookie_jar), - urllib.request.HTTPSHandler(context=ctx), - ) - - # Step 1: POST credentials to /login - login_url = f"{self._origin}/login" - form_data = urllib.parse.urlencode( - {"username": username, "password": password} - ).encode() - req = urllib.request.Request( - login_url, - data=form_data, - method="POST", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - ) - try: - opener.open(req) - except urllib.error.HTTPError as e: - if e.code not in (302, 303): - raise RuntimeError(f"Login failed (HTTP {e.code})") - except urllib.error.URLError as e: - raise RuntimeError(f"Connection error: {e.reason}") - - # Step 2: Verify via /api/auth/me - me_req = urllib.request.Request(f"{self._origin}/api/auth/me", method="GET") - try: - with opener.open(me_req) as resp: - user_info = json.loads(resp.read().decode()) - except urllib.error.HTTPError as e: - if e.code == 401: - raise RuntimeError("Login failed: invalid username or password") - raise RuntimeError(f"Login verification failed (HTTP {e.code})") - - # Step 3: Create API token - hostname = socket.gethostname() - token_body = json.dumps( - {"name": f"LibreOffice Calc ({hostname})", "expires_in_days": 90} - ).encode() - token_req = urllib.request.Request( - f"{self._origin}/api/auth/tokens", - data=token_body, - method="POST", - headers={"Content-Type": "application/json"}, - ) - try: - with opener.open(token_req) as resp: - token_result = json.loads(resp.read().decode()) - except urllib.error.HTTPError as e: - raise RuntimeError(f"Failed to create API token (HTTP {e.code})") - - raw_token = token_result.get("token", "") - if not raw_token: - raise RuntimeError("Server did not return an API token") - - _settings.save_auth( - username=user_info.get("username", username), - role=user_info.get("role", ""), - source=user_info.get("auth_source", ""), - token=raw_token, - ) - return { - "username": user_info.get("username", username), - "role": user_info.get("role", ""), - "auth_source": user_info.get("auth_source", ""), - "token_name": token_result.get("name", ""), - } - - def logout(self): - _settings.clear_auth() - - def is_authenticated(self) -> bool: - cfg = _settings.load() - return bool(cfg.get("api_token") or os.environ.get("SILO_API_TOKEN")) - - def get_current_user(self) -> Optional[Dict[str, Any]]: - try: - return self._request("GET", "/auth/me") - except RuntimeError: - return None - - def check_connection(self) -> Tuple[bool, str]: - url = f"{self._origin}/health" - req = urllib.request.Request(url, method="GET") - try: - with urllib.request.urlopen( - req, context=_get_ssl_context(), timeout=5 - ) as resp: - return True, f"OK ({resp.status})" - except urllib.error.HTTPError as e: - return True, f"Server error ({e.code})" - except urllib.error.URLError as e: - return False, str(e.reason) - except Exception as e: - return False, str(e) - - # -- Items -------------------------------------------------------------- - - def get_item(self, part_number: str) -> Dict[str, Any]: - return self._request( - "GET", f"/items/{urllib.parse.quote(part_number, safe='')}" - ) - - def list_items(self, search: str = "", project: str = "", limit: int = 100) -> list: - params = [f"limit={limit}"] - if search: - params.append(f"search={urllib.parse.quote(search)}") - if project: - params.append(f"project={urllib.parse.quote(project)}") - return self._request("GET", "/items?" + "&".join(params)) - - def create_item( - self, - schema: str, - category: str, - description: str = "", - projects: Optional[List[str]] = None, - sourcing_type: str = "", - sourcing_link: str = "", - standard_cost: Optional[float] = None, - long_description: str = "", - ) -> Dict[str, Any]: - data: Dict[str, Any] = { - "schema": schema, - "category": category, - "description": description, - } - if projects: - data["projects"] = projects - if sourcing_type: - data["sourcing_type"] = sourcing_type - if sourcing_link: - data["sourcing_link"] = sourcing_link - if standard_cost is not None: - data["standard_cost"] = standard_cost - if long_description: - data["long_description"] = long_description - return self._request("POST", "/items", data) - - def update_item(self, part_number: str, **fields) -> Dict[str, Any]: - return self._request( - "PUT", f"/items/{urllib.parse.quote(part_number, safe='')}", fields - ) - - # -- Projects ----------------------------------------------------------- - - def get_projects(self) -> list: - return self._request("GET", "/projects") - - def get_project_items(self, code: str) -> list: - return self._request( - "GET", f"/projects/{urllib.parse.quote(code, safe='')}/items" - ) - - def add_item_projects( - self, part_number: str, project_codes: List[str] - ) -> Dict[str, Any]: - return self._request( - "POST", - f"/items/{urllib.parse.quote(part_number, safe='')}/projects", - {"projects": project_codes}, - ) - - def get_item_projects(self, part_number: str) -> list: - return self._request( - "GET", f"/items/{urllib.parse.quote(part_number, safe='')}/projects" - ) - - # -- Schemas ------------------------------------------------------------ - - def get_schema(self, name: str = "kindred-rd") -> Dict[str, Any]: - return self._request("GET", f"/schemas/{urllib.parse.quote(name, safe='')}") - - def get_property_schema(self, name: str = "kindred-rd") -> Dict[str, Any]: - return self._request( - "GET", f"/schemas/{urllib.parse.quote(name, safe='')}/properties" - ) - - # -- BOM ---------------------------------------------------------------- - - def get_bom(self, part_number: str) -> list: - return self._request( - "GET", f"/items/{urllib.parse.quote(part_number, safe='')}/bom" - ) - - def get_bom_expanded(self, part_number: str, depth: int = 10) -> list: - return self._request( - "GET", - f"/items/{urllib.parse.quote(part_number, safe='')}/bom/expanded?depth={depth}", - ) - - def add_bom_entry( - self, - parent_pn: str, - child_pn: str, - quantity: Optional[float] = None, - rel_type: str = "component", - metadata: Optional[Dict] = None, - ) -> Dict[str, Any]: - data: Dict[str, Any] = { - "child_part_number": child_pn, - "rel_type": rel_type, - } - if quantity is not None: - data["quantity"] = quantity - if metadata: - data["metadata"] = metadata - return self._request( - "POST", f"/items/{urllib.parse.quote(parent_pn, safe='')}/bom", data - ) - - def update_bom_entry( - self, - parent_pn: str, - child_pn: str, - quantity: Optional[float] = None, - metadata: Optional[Dict] = None, - ) -> Dict[str, Any]: - data: Dict[str, Any] = {} - if quantity is not None: - data["quantity"] = quantity - if metadata: - data["metadata"] = metadata - return self._request( - "PUT", - f"/items/{urllib.parse.quote(parent_pn, safe='')}/bom/{urllib.parse.quote(child_pn, safe='')}", - data, - ) - - # -- Revisions ---------------------------------------------------------- - - def get_revisions(self, part_number: str) -> list: - return self._request( - "GET", f"/items/{urllib.parse.quote(part_number, safe='')}/revisions" - ) - - def get_revision(self, part_number: str, revision: int) -> Dict[str, Any]: - return self._request( - "GET", - f"/items/{urllib.parse.quote(part_number, safe='')}/revisions/{revision}", - ) - - # -- ODS endpoints ------------------------------------------------------ - - def download_bom_ods(self, part_number: str) -> bytes: - return self._download( - f"/items/{urllib.parse.quote(part_number, safe='')}/bom/export.ods" - ) - - def download_project_sheet(self, project_code: str) -> bytes: - return self._download( - f"/projects/{urllib.parse.quote(project_code, safe='')}/sheet.ods" - ) - - def upload_sheet_diff( - self, ods_bytes: bytes, filename: str = "sheet.ods" - ) -> Dict[str, Any]: - return self._upload_ods("/sheets/diff", ods_bytes, filename) - - # -- Part number generation --------------------------------------------- - - def generate_part_number(self, schema: str, category: str) -> Dict[str, Any]: - return self._request( - "POST", - "/generate-part-number", - {"schema": schema, "category": category}, - ) diff --git a/pkg/calc/pythonpath/silo_calc/completion_wizard.py b/pkg/calc/pythonpath/silo_calc/completion_wizard.py deleted file mode 100644 index 42b3119..0000000 --- a/pkg/calc/pythonpath/silo_calc/completion_wizard.py +++ /dev/null @@ -1,395 +0,0 @@ -"""Completion Wizard for adding new items to a BOM sheet. - -Three-step guided workflow: -1. Category selection (from schema) -2. Required fields (Description, optional PN) -3. Common fields (Source, Unit Cost, QTY, Sourcing Link, category-specific properties) - -If a manually entered PN already exists, the PN Conflict Resolution dialog -is shown. -""" - -from typing import Any, Dict, List, Optional, Tuple - -from . import ai_client as _ai -from . import dialogs, sync_engine -from . import settings as _settings -from . import sheet_format as sf -from .client import SiloClient - -# UNO imports -try: - import uno - - _HAS_UNO = True - - _HAS_UNO = True -except ImportError: - _HAS_UNO = False - -# Category prefix descriptions for grouping in the picker -_PREFIX_GROUPS = { - "F": "Fasteners", - "C": "Fittings", - "R": "Motion", - "S": "Structural", - "E": "Electrical", - "M": "Mechanical", - "T": "Tooling", - "A": "Assemblies", - "P": "Purchased", - "X": "Custom Fabricated", -} - -# Default sourcing type by category prefix -_DEFAULT_SOURCING = { - "A": "M", # assemblies are manufactured - "X": "M", # custom fab is manufactured - "T": "M", # tooling is manufactured -} - - -def _get_categories( - client: SiloClient, schema: str = "kindred-rd" -) -> List[Tuple[str, str]]: - """Fetch category codes and descriptions from the schema. - - Returns list of (code, description) tuples sorted by code. - """ - try: - schema_data = client.get_schema(schema) - segments = schema_data.get("segments", []) - cat_segment = None - for seg in segments: - if seg.get("name") == "category": - cat_segment = seg - break - if cat_segment and cat_segment.get("values"): - return sorted(cat_segment["values"].items()) - except RuntimeError: - pass - return [] - - -def _get_category_properties( - client: SiloClient, category: str, schema: str = "kindred-rd" -) -> List[str]: - """Fetch property field names relevant to a category. - - Returns the list of property keys that apply to the category's prefix group. - """ - try: - prop_schema = client.get_property_schema(schema) - # prop_schema has global defaults and category-specific overrides - defaults = prop_schema.get("defaults", {}) - category_props = prop_schema.get("categories", {}).get(category[:1], {}) - # Merge: category-specific fields + global defaults - all_keys = set(defaults.keys()) - all_keys.update(category_props.keys()) - return sorted(all_keys) - except RuntimeError: - return list(sf.PROPERTY_KEY_MAP.values()) - - -# --------------------------------------------------------------------------- -# Wizard dialog (UNO) -# --------------------------------------------------------------------------- - - -def run_completion_wizard( - client: SiloClient, - doc, - sheet, - insert_row: int, - project_code: str = "", - schema: str = "kindred-rd", -) -> bool: - """Run the item completion wizard. Returns True if a row was inserted. - - Parameters - ---------- - client : SiloClient - doc : XSpreadsheetDocument - sheet : XSpreadsheet - insert_row : int (0-based row index to insert at) - project_code : str (for auto-tagging) - schema : str - """ - if not _HAS_UNO: - return False - - ctx = uno.getComponentContext() - smgr = ctx.ServiceManager - - # -- Step 1: Category selection ----------------------------------------- - categories = _get_categories(client, schema) - if not categories: - dialogs._msgbox( - None, - "Add Item", - "Could not fetch categories from server.", - box_type="errorbox", - ) - return False - - # Build display list grouped by prefix - cat_display = [] - for code, desc in categories: - prefix = code[0] if code else "?" - group = _PREFIX_GROUPS.get(prefix, "Other") - cat_display.append(f"{code} - {desc} [{group}]") - - # Use a simple input box with the category list as hint - # (A proper ListBox dialog would be more polished but this is functional) - cat_hint = ", ".join(c[0] for c in categories[:20]) - if len(categories) > 20: - cat_hint += f"... ({len(categories)} total)" - - category_input = dialogs._input_box( - "Add Item - Step 1/3", - f"Category code ({cat_hint}):", - ) - if not category_input: - return False - category = category_input.strip().upper() - - # Validate category - valid_codes = {c[0] for c in categories} - if category not in valid_codes: - dialogs._msgbox( - None, - "Add Item", - f"Unknown category: {category}", - box_type="errorbox", - ) - return False - - # -- Step 2: Required fields -------------------------------------------- - description = dialogs._input_box( - "Add Item - Step 2/3", - "Description (required, leave blank to use AI):", - ) - - # If blank and AI is configured, offer AI generation from seller description - if (not description or not description.strip()) and _ai.is_configured(): - seller_desc = dialogs._input_box( - "Add Item - AI Description", - "Paste the seller description for AI generation:", - ) - if seller_desc and seller_desc.strip(): - try: - ai_desc = _ai.generate_description( - seller_description=seller_desc.strip(), - category=category, - ) - accepted = dialogs.show_ai_description_dialog( - seller_desc.strip(), ai_desc - ) - if accepted: - description = accepted - except RuntimeError as e: - dialogs._msgbox( - None, - "AI Description Failed", - str(e), - box_type="errorbox", - ) - - if not description or not description.strip(): - dialogs._msgbox( - None, "Add Item", "Description is required.", box_type="errorbox" - ) - return False - - manual_pn = dialogs._input_box( - "Add Item - Step 2/3", - "Part number (leave blank for auto-generation):", - ) - - # Check for PN conflict if user entered one - use_existing_item = None - if manual_pn and manual_pn.strip(): - manual_pn = manual_pn.strip() - try: - existing = client.get_item(manual_pn) - # PN exists -- show conflict dialog - result = dialogs.show_pn_conflict_dialog(manual_pn, existing) - if result == dialogs.PN_USE_EXISTING: - use_existing_item = existing - elif result == dialogs.PN_CREATE_NEW: - manual_pn = "" # will auto-generate - else: - return False # cancelled - except RuntimeError: - pass # PN doesn't exist, which is fine - - # -- Step 3: Common fields ---------------------------------------------- - prefix = category[0] if category else "" - default_source = _DEFAULT_SOURCING.get(prefix, "P") - - source = dialogs._input_box( - "Add Item - Step 3/3", - f"Sourcing type (M=manufactured, P=purchased) [default: {default_source}]:", - default=default_source, - ) - if source is None: - return False - source = source.strip().upper() or default_source - - unit_cost_str = dialogs._input_box( - "Add Item - Step 3/3", - "Unit cost (e.g. 10.50):", - default="0", - ) - unit_cost = 0.0 - if unit_cost_str: - try: - unit_cost = float(unit_cost_str.strip().replace("$", "").replace(",", "")) - except ValueError: - pass - - qty_str = dialogs._input_box( - "Add Item - Step 3/3", - "Quantity [default: 1]:", - default="1", - ) - qty = 1.0 - if qty_str: - try: - qty = float(qty_str.strip()) - except ValueError: - pass - - sourcing_link = ( - dialogs._input_box( - "Add Item - Step 3/3", - "Sourcing link (URL, optional):", - ) - or "" - ) - - # -- Create item or use existing ---------------------------------------- - created_item = None - if use_existing_item: - # Use the existing item's data - created_item = use_existing_item - final_pn = use_existing_item.get("part_number", manual_pn) - elif manual_pn: - # Create with the user's manual PN - try: - created_item = client.create_item( - schema=schema, - category=category, - description=description.strip(), - projects=[project_code] if project_code else None, - sourcing_type=source, - sourcing_link=sourcing_link.strip(), - standard_cost=unit_cost if unit_cost else None, - ) - final_pn = created_item.get("part_number", manual_pn) - except RuntimeError as e: - dialogs._msgbox(None, "Add Item Failed", str(e), box_type="errorbox") - return False - else: - # Auto-generate PN - try: - created_item = client.create_item( - schema=schema, - category=category, - description=description.strip(), - projects=[project_code] if project_code else None, - sourcing_type=source, - sourcing_link=sourcing_link.strip(), - standard_cost=unit_cost if unit_cost else None, - ) - final_pn = created_item.get("part_number", "") - except RuntimeError as e: - dialogs._msgbox(None, "Add Item Failed", str(e), box_type="errorbox") - return False - - if not final_pn: - dialogs._msgbox( - None, "Add Item", "No part number returned.", box_type="errorbox" - ) - return False - - # Auto-tag with project if needed - if project_code and created_item and not use_existing_item: - try: - client.add_item_projects(final_pn, [project_code]) - except RuntimeError: - pass - - # -- Insert row into sheet ---------------------------------------------- - _insert_bom_row( - sheet, - insert_row, - pn=final_pn, - description=created_item.get("description", description.strip()) - if created_item - else description.strip(), - unit_cost=unit_cost, - qty=qty, - sourcing_link=sourcing_link.strip(), - schema=schema, - status=sync_engine.STATUS_NEW, - parent_pn="", - ) - - return True - - -def _insert_bom_row( - sheet, - row: int, - pn: str, - description: str, - source: str, - unit_cost: float, - qty: float, - sourcing_link: str, - schema: str, - status: str, - parent_pn: str, -): - """Write a single BOM row at the given position with sync tracking.""" - from . import pull as _pull # avoid circular import at module level - - _pull._set_cell_string(sheet, sf.COL_ITEM, row, "") - _pull._set_cell_string(sheet, sf.COL_LEVEL, row, "") - _pull._set_cell_string(sheet, sf.COL_SOURCE, row, source) - _pull._set_cell_string(sheet, sf.COL_PN, row, pn) - _pull._set_cell_string(sheet, sf.COL_DESCRIPTION, row, description) - _pull._set_cell_string(sheet, sf.COL_SELLER_DESC, row, "") - - if unit_cost: - _pull._set_cell_float(sheet, sf.COL_UNIT_COST, row, unit_cost) - _pull._set_cell_float(sheet, sf.COL_QTY, row, qty) - - # Ext Cost formula - ext_formula = f"={sf.col_letter(sf.COL_UNIT_COST)}{row + 1}*{sf.col_letter(sf.COL_QTY)}{row + 1}" - _pull._set_cell_formula(sheet, sf.COL_EXT_COST, row, ext_formula) - - _pull._set_cell_string(sheet, sf.COL_SOURCING_LINK, row, sourcing_link) - _pull._set_cell_string(sheet, sf.COL_SCHEMA, row, schema) - - # Build row cells for hash computation - row_cells = [""] * sf.BOM_TOTAL_COLS - row_cells[sf.COL_SOURCE] = source - row_cells[sf.COL_PN] = pn - row_cells[sf.COL_DESCRIPTION] = description - row_cells[sf.COL_UNIT_COST] = str(unit_cost) if unit_cost else "" - row_cells[sf.COL_QTY] = str(qty) - row_cells[sf.COL_SOURCING_LINK] = sourcing_link - row_cells[sf.COL_SCHEMA] = schema - - sync_engine.update_row_sync_state(row_cells, status, parent_pn=parent_pn) - _pull._set_cell_string(sheet, sf.COL_ROW_HASH, row, row_cells[sf.COL_ROW_HASH]) - _pull._set_cell_string(sheet, sf.COL_ROW_STATUS, row, row_cells[sf.COL_ROW_STATUS]) - _pull._set_cell_string(sheet, sf.COL_UPDATED_AT, row, row_cells[sf.COL_UPDATED_AT]) - _pull._set_cell_string(sheet, sf.COL_PARENT_PN, row, row_cells[sf.COL_PARENT_PN]) - - # Colour the row - color = _pull._STATUS_COLORS.get(status) - if color: - _pull._set_row_bg(sheet, row, sf.BOM_TOTAL_COLS, color) diff --git a/pkg/calc/pythonpath/silo_calc/dialogs.py b/pkg/calc/pythonpath/silo_calc/dialogs.py deleted file mode 100644 index 7028735..0000000 --- a/pkg/calc/pythonpath/silo_calc/dialogs.py +++ /dev/null @@ -1,667 +0,0 @@ -"""UNO dialogs for the Silo Calc extension. - -Provides login, settings, push summary, and PN conflict resolution dialogs. -All dialogs use the UNO dialog toolkit (``com.sun.star.awt``). -""" - -from typing import Any, Dict, List, Optional, Tuple - -# UNO imports are only available inside LibreOffice -try: - import uno - - _HAS_UNO = True -except ImportError: - _HAS_UNO = False - -from . import settings as _settings -from .client import SiloClient - - -def _get_desktop(): - """Return the XSCRIPTCONTEXT desktop, or resolve via component context.""" - ctx = uno.getComponentContext() - smgr = ctx.ServiceManager - return smgr.createInstanceWithContext("com.sun.star.frame.Desktop", ctx) - - -def _msgbox(parent, title: str, message: str, box_type="infobox"): - """Show a simple message box.""" - if not _HAS_UNO: - return - ctx = uno.getComponentContext() - smgr = ctx.ServiceManager - toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx) - if parent is None: - parent = _get_desktop().getCurrentFrame().getContainerWindow() - mbt = uno.Enum( - "com.sun.star.awt.MessageBoxType", - "INFOBOX" if box_type == "infobox" else "ERRORBOX", - ) - msg_box = toolkit.createMessageBox(parent, mbt, 1, title, message) - msg_box.execute() - - -def _input_box( - title: str, label: str, default: str = "", password: bool = False -) -> Optional[str]: - """Show a simple single-field input dialog. Returns None on cancel.""" - if not _HAS_UNO: - return None - ctx = uno.getComponentContext() - smgr = ctx.ServiceManager - dlg_provider = smgr.createInstanceWithContext( - "com.sun.star.awt.DialogProvider", ctx - ) - - # Build dialog model programmatically - dlg_model = smgr.createInstanceWithContext( - "com.sun.star.awt.UnoControlDialogModel", ctx - ) - dlg_model.Width = 220 - dlg_model.Height = 80 - dlg_model.Title = title - - # Label - lbl = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") - lbl.Name = "lbl" - lbl.PositionX = 10 - lbl.PositionY = 10 - lbl.Width = 200 - lbl.Height = 12 - lbl.Label = label - dlg_model.insertByName("lbl", lbl) - - # Text field - tf = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") - tf.Name = "tf" - tf.PositionX = 10 - tf.PositionY = 24 - tf.Width = 200 - tf.Height = 14 - tf.Text = default - if password: - tf.EchoChar = ord("*") - dlg_model.insertByName("tf", tf) - - # OK button - btn_ok = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") - btn_ok.Name = "btn_ok" - btn_ok.PositionX = 110 - btn_ok.PositionY = 50 - btn_ok.Width = 45 - btn_ok.Height = 16 - btn_ok.Label = "OK" - btn_ok.PushButtonType = 1 # OK - dlg_model.insertByName("btn_ok", btn_ok) - - # Cancel button - btn_cancel = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") - btn_cancel.Name = "btn_cancel" - btn_cancel.PositionX = 160 - btn_cancel.PositionY = 50 - btn_cancel.Width = 45 - btn_cancel.Height = 16 - btn_cancel.Label = "Cancel" - btn_cancel.PushButtonType = 2 # CANCEL - dlg_model.insertByName("btn_cancel", btn_cancel) - - # Create dialog control - dlg = smgr.createInstanceWithContext("com.sun.star.awt.UnoControlDialog", ctx) - dlg.setModel(dlg_model) - dlg.setVisible(False) - toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx) - dlg.createPeer(toolkit, None) - - result = dlg.execute() - if result == 1: # OK - text = dlg.getControl("tf").getText() - dlg.dispose() - return text - dlg.dispose() - return None - - -# --------------------------------------------------------------------------- -# Login dialog -# --------------------------------------------------------------------------- - - -def show_login_dialog(parent=None) -> bool: - """Two-step login: username then password. Returns True on success.""" - username = _input_box("Silo Login", "Username:") - if not username: - return False - password = _input_box("Silo Login", f"Password for {username}:", password=True) - if not password: - return False - - client = SiloClient() - try: - result = client.login(username, password) - _msgbox( - parent, - "Silo Login", - f"Logged in as {result['username']} ({result.get('role', 'viewer')})", - ) - return True - except RuntimeError as e: - _msgbox(parent, "Silo Login Failed", str(e), box_type="errorbox") - return False - - -# --------------------------------------------------------------------------- -# Settings dialog -# --------------------------------------------------------------------------- - - -def show_settings_dialog(parent=None) -> bool: - """Show the settings dialog. Returns True if saved.""" - if not _HAS_UNO: - return False - - ctx = uno.getComponentContext() - smgr = ctx.ServiceManager - cfg = _settings.load() - - dlg_model = smgr.createInstanceWithContext( - "com.sun.star.awt.UnoControlDialogModel", ctx - ) - dlg_model.Width = 300 - dlg_model.Height = 200 - dlg_model.Title = "Silo Settings" - - fields = [ - ("API URL", "api_url", cfg.get("api_url", "")), - ("API Token", "api_token", cfg.get("api_token", "")), - ("SSL Cert Path", "ssl_cert_path", cfg.get("ssl_cert_path", "")), - ("Projects Dir", "projects_dir", cfg.get("projects_dir", "")), - ("Default Schema", "default_schema", cfg.get("default_schema", "kindred-rd")), - ] - - y = 10 - for label_text, name, default in fields: - lbl = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") - lbl.Name = f"lbl_{name}" - lbl.PositionX = 10 - lbl.PositionY = y - lbl.Width = 80 - lbl.Height = 12 - lbl.Label = label_text - dlg_model.insertByName(f"lbl_{name}", lbl) - - tf = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") - tf.Name = f"tf_{name}" - tf.PositionX = 95 - tf.PositionY = y - tf.Width = 195 - tf.Height = 14 - tf.Text = default - dlg_model.insertByName(f"tf_{name}", tf) - y += 22 - - # SSL verify checkbox - cb = dlg_model.createInstance("com.sun.star.awt.UnoControlCheckBoxModel") - cb.Name = "cb_ssl_verify" - cb.PositionX = 95 - cb.PositionY = y - cb.Width = 120 - cb.Height = 14 - cb.Label = "Verify SSL" - cb.State = 1 if cfg.get("ssl_verify", True) else 0 - dlg_model.insertByName("cb_ssl_verify", cb) - y += 22 - - # --- OpenRouter AI section --- - lbl_ai = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") - lbl_ai.Name = "lbl_ai_section" - lbl_ai.PositionX = 10 - lbl_ai.PositionY = y - lbl_ai.Width = 280 - lbl_ai.Height = 12 - lbl_ai.Label = "--- OpenRouter AI ---" - dlg_model.insertByName("lbl_ai_section", lbl_ai) - y += 16 - - # API Key (masked) - lbl_key = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") - lbl_key.Name = "lbl_openrouter_api_key" - lbl_key.PositionX = 10 - lbl_key.PositionY = y - lbl_key.Width = 80 - lbl_key.Height = 12 - lbl_key.Label = "API Key" - dlg_model.insertByName("lbl_openrouter_api_key", lbl_key) - - tf_key = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") - tf_key.Name = "tf_openrouter_api_key" - tf_key.PositionX = 95 - tf_key.PositionY = y - tf_key.Width = 195 - tf_key.Height = 14 - tf_key.Text = cfg.get("openrouter_api_key", "") - tf_key.EchoChar = ord("*") - dlg_model.insertByName("tf_openrouter_api_key", tf_key) - y += 22 - - # AI Model - lbl_model = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") - lbl_model.Name = "lbl_openrouter_model" - lbl_model.PositionX = 10 - lbl_model.PositionY = y - lbl_model.Width = 80 - lbl_model.Height = 12 - lbl_model.Label = "AI Model" - dlg_model.insertByName("lbl_openrouter_model", lbl_model) - - tf_model = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") - tf_model.Name = "tf_openrouter_model" - tf_model.PositionX = 95 - tf_model.PositionY = y - tf_model.Width = 195 - tf_model.Height = 14 - tf_model.Text = cfg.get("openrouter_model", "") - tf_model.HelpText = "openai/gpt-4.1-nano" - dlg_model.insertByName("tf_openrouter_model", tf_model) - y += 22 - - # AI Instructions (multi-line) - lbl_instr = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") - lbl_instr.Name = "lbl_openrouter_instructions" - lbl_instr.PositionX = 10 - lbl_instr.PositionY = y - lbl_instr.Width = 80 - lbl_instr.Height = 12 - lbl_instr.Label = "AI Instructions" - dlg_model.insertByName("lbl_openrouter_instructions", lbl_instr) - - tf_instr = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") - tf_instr.Name = "tf_openrouter_instructions" - tf_instr.PositionX = 95 - tf_instr.PositionY = y - tf_instr.Width = 195 - tf_instr.Height = 56 - tf_instr.Text = cfg.get("openrouter_instructions", "") - tf_instr.MultiLine = True - tf_instr.VScroll = True - tf_instr.HelpText = "Custom system prompt (leave blank for default)" - dlg_model.insertByName("tf_openrouter_instructions", tf_instr) - y += 62 - - # Test connection button - btn_test = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") - btn_test.Name = "btn_test" - btn_test.PositionX = 10 - btn_test.PositionY = y - btn_test.Width = 80 - btn_test.Height = 16 - btn_test.Label = "Test Connection" - dlg_model.insertByName("btn_test", btn_test) - - # Status label - lbl_status = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") - lbl_status.Name = "lbl_status" - lbl_status.PositionX = 95 - lbl_status.PositionY = y + 2 - lbl_status.Width = 195 - lbl_status.Height = 12 - lbl_status.Label = "" - dlg_model.insertByName("lbl_status", lbl_status) - y += 22 - - # OK / Cancel - btn_ok = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") - btn_ok.Name = "btn_ok" - btn_ok.PositionX = 190 - btn_ok.PositionY = y - btn_ok.Width = 45 - btn_ok.Height = 16 - btn_ok.Label = "Save" - btn_ok.PushButtonType = 1 - dlg_model.insertByName("btn_ok", btn_ok) - - btn_cancel = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") - btn_cancel.Name = "btn_cancel" - btn_cancel.PositionX = 240 - btn_cancel.PositionY = y - btn_cancel.Width = 45 - btn_cancel.Height = 16 - btn_cancel.Label = "Cancel" - btn_cancel.PushButtonType = 2 - dlg_model.insertByName("btn_cancel", btn_cancel) - - dlg_model.Height = y + 26 - - dlg = smgr.createInstanceWithContext("com.sun.star.awt.UnoControlDialog", ctx) - dlg.setModel(dlg_model) - dlg.setVisible(False) - toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx) - dlg.createPeer(toolkit, None) - - result = dlg.execute() - if result == 1: - for _, name, _ in fields: - cfg[name] = dlg.getControl(f"tf_{name}").getText() - cfg["ssl_verify"] = bool(dlg.getControl("cb_ssl_verify").getModel().State) - cfg["openrouter_api_key"] = dlg.getControl("tf_openrouter_api_key").getText() - cfg["openrouter_model"] = dlg.getControl("tf_openrouter_model").getText() - cfg["openrouter_instructions"] = dlg.getControl( - "tf_openrouter_instructions" - ).getText() - _settings.save(cfg) - dlg.dispose() - return True - - dlg.dispose() - return False - - -# --------------------------------------------------------------------------- -# Push summary dialog -# --------------------------------------------------------------------------- - - -def show_push_summary( - new_count: int, - modified_count: int, - conflict_count: int, - unchanged_count: int, - parent=None, -) -> bool: - """Show push summary and return True if user confirms.""" - lines = [ - f"New items: {new_count}", - f"Modified items: {modified_count}", - f"Conflicts: {conflict_count}", - f"Unchanged: {unchanged_count}", - ] - if conflict_count: - lines.append("\nConflicts must be resolved before pushing.") - - msg = "\n".join(lines) - if conflict_count: - _msgbox(parent, "Silo Push -- Conflicts Found", msg, box_type="errorbox") - return False - - if new_count == 0 and modified_count == 0: - _msgbox(parent, "Silo Push", "Nothing to push -- all rows are up to date.") - return False - - # Confirmation -- for now use a simple info box (OK = proceed) - _msgbox(parent, "Silo Push", f"Ready to push:\n\n{msg}\n\nProceed?") - return True - - -# --------------------------------------------------------------------------- -# PN Conflict Resolution dialog -# --------------------------------------------------------------------------- - -# Return values -PN_USE_EXISTING = "use_existing" -PN_CREATE_NEW = "create_new" -PN_CANCEL = "cancel" - - -def show_pn_conflict_dialog( - part_number: str, - existing_item: Dict[str, Any], - parent=None, -) -> str: - """Show PN conflict dialog when a manually entered PN already exists. - - Returns one of: PN_USE_EXISTING, PN_CREATE_NEW, PN_CANCEL. - """ - if not _HAS_UNO: - return PN_CANCEL - - ctx = uno.getComponentContext() - smgr = ctx.ServiceManager - - dlg_model = smgr.createInstanceWithContext( - "com.sun.star.awt.UnoControlDialogModel", ctx - ) - dlg_model.Width = 320 - dlg_model.Height = 220 - dlg_model.Title = f"Part Number Conflict: {part_number}" - - y = 10 - info_lines = [ - "This part number already exists in Silo:", - "", - f" Description: {existing_item.get('description', '')}", - f" Type: {existing_item.get('item_type', '')}", - f" Category: {existing_item.get('part_number', '')[:3]}", - f" Sourcing: {existing_item.get('sourcing_type', '')}", - f" Cost: ${existing_item.get('standard_cost', 0):.2f}", - ] - - for line in info_lines: - lbl = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") - lbl.Name = f"info_{y}" - lbl.PositionX = 10 - lbl.PositionY = y - lbl.Width = 300 - lbl.Height = 12 - lbl.Label = line - dlg_model.insertByName(f"info_{y}", lbl) - y += 13 - - y += 5 - - # Radio buttons - rb_use = dlg_model.createInstance("com.sun.star.awt.UnoControlRadioButtonModel") - rb_use.Name = "rb_use" - rb_use.PositionX = 20 - rb_use.PositionY = y - rb_use.Width = 280 - rb_use.Height = 14 - rb_use.Label = "Use existing item (add to BOM)" - rb_use.State = 1 # selected by default - dlg_model.insertByName("rb_use", rb_use) - y += 18 - - rb_new = dlg_model.createInstance("com.sun.star.awt.UnoControlRadioButtonModel") - rb_new.Name = "rb_new" - rb_new.PositionX = 20 - rb_new.PositionY = y - rb_new.Width = 280 - rb_new.Height = 14 - rb_new.Label = "Create new item (auto-generate PN)" - rb_new.State = 0 - dlg_model.insertByName("rb_new", rb_new) - y += 18 - - rb_cancel = dlg_model.createInstance("com.sun.star.awt.UnoControlRadioButtonModel") - rb_cancel.Name = "rb_cancel" - rb_cancel.PositionX = 20 - rb_cancel.PositionY = y - rb_cancel.Width = 280 - rb_cancel.Height = 14 - rb_cancel.Label = "Cancel" - rb_cancel.State = 0 - dlg_model.insertByName("rb_cancel", rb_cancel) - y += 25 - - # OK button - btn_ok = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") - btn_ok.Name = "btn_ok" - btn_ok.PositionX = 210 - btn_ok.PositionY = y - btn_ok.Width = 45 - btn_ok.Height = 16 - btn_ok.Label = "OK" - btn_ok.PushButtonType = 1 - dlg_model.insertByName("btn_ok", btn_ok) - - btn_cancel_btn = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") - btn_cancel_btn.Name = "btn_cancel_btn" - btn_cancel_btn.PositionX = 260 - btn_cancel_btn.PositionY = y - btn_cancel_btn.Width = 45 - btn_cancel_btn.Height = 16 - btn_cancel_btn.Label = "Cancel" - btn_cancel_btn.PushButtonType = 2 - dlg_model.insertByName("btn_cancel_btn", btn_cancel_btn) - - dlg_model.Height = y + 26 - - dlg = smgr.createInstanceWithContext("com.sun.star.awt.UnoControlDialog", ctx) - dlg.setModel(dlg_model) - dlg.setVisible(False) - toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx) - dlg.createPeer(toolkit, None) - - result = dlg.execute() - if result != 1: - dlg.dispose() - return PN_CANCEL - - if dlg.getControl("rb_use").getModel().State: - dlg.dispose() - return PN_USE_EXISTING - if dlg.getControl("rb_new").getModel().State: - dlg.dispose() - return PN_CREATE_NEW - - dlg.dispose() - return PN_CANCEL - - -# --------------------------------------------------------------------------- -# AI Description review dialog -# --------------------------------------------------------------------------- - - -def show_ai_description_dialog( - seller_description: str, ai_description: str, parent=None -) -> Optional[str]: - """Show AI-generated description for review/editing. - - Side-by-side layout: seller description (read-only) on the left, - AI-generated description (editable) on the right. - - Returns the accepted/edited description text, or None on cancel. - """ - if not _HAS_UNO: - return None - - ctx = uno.getComponentContext() - smgr = ctx.ServiceManager - - dlg_model = smgr.createInstanceWithContext( - "com.sun.star.awt.UnoControlDialogModel", ctx - ) - dlg_model.Width = 400 - dlg_model.Height = 210 - dlg_model.Title = "AI Description Review" - - # Left: Seller Description (read-only) - lbl_seller = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") - lbl_seller.Name = "lbl_seller" - lbl_seller.PositionX = 10 - lbl_seller.PositionY = 8 - lbl_seller.Width = 185 - lbl_seller.Height = 12 - lbl_seller.Label = "Seller Description" - dlg_model.insertByName("lbl_seller", lbl_seller) - - tf_seller = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") - tf_seller.Name = "tf_seller" - tf_seller.PositionX = 10 - tf_seller.PositionY = 22 - tf_seller.Width = 185 - tf_seller.Height = 140 - tf_seller.Text = seller_description - tf_seller.MultiLine = True - tf_seller.VScroll = True - tf_seller.ReadOnly = True - dlg_model.insertByName("tf_seller", tf_seller) - - # Right: Generated Description (editable) - lbl_gen = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") - lbl_gen.Name = "lbl_gen" - lbl_gen.PositionX = 205 - lbl_gen.PositionY = 8 - lbl_gen.Width = 185 - lbl_gen.Height = 12 - lbl_gen.Label = "Generated Description (editable)" - dlg_model.insertByName("lbl_gen", lbl_gen) - - tf_gen = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") - tf_gen.Name = "tf_gen" - tf_gen.PositionX = 205 - tf_gen.PositionY = 22 - tf_gen.Width = 185 - tf_gen.Height = 140 - tf_gen.Text = ai_description - tf_gen.MultiLine = True - tf_gen.VScroll = True - dlg_model.insertByName("tf_gen", tf_gen) - - # Accept button - btn_ok = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") - btn_ok.Name = "btn_ok" - btn_ok.PositionX = 290 - btn_ok.PositionY = 175 - btn_ok.Width = 50 - btn_ok.Height = 18 - btn_ok.Label = "Accept" - btn_ok.PushButtonType = 1 # OK - dlg_model.insertByName("btn_ok", btn_ok) - - # Cancel button - btn_cancel = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") - btn_cancel.Name = "btn_cancel" - btn_cancel.PositionX = 345 - btn_cancel.PositionY = 175 - btn_cancel.Width = 45 - btn_cancel.Height = 18 - btn_cancel.Label = "Cancel" - btn_cancel.PushButtonType = 2 # CANCEL - dlg_model.insertByName("btn_cancel", btn_cancel) - - dlg = smgr.createInstanceWithContext("com.sun.star.awt.UnoControlDialog", ctx) - dlg.setModel(dlg_model) - dlg.setVisible(False) - toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx) - dlg.createPeer(toolkit, None) - - result = dlg.execute() - if result == 1: # OK / Accept - text = dlg.getControl("tf_gen").getText() - dlg.dispose() - return text - - dlg.dispose() - return None - - -# --------------------------------------------------------------------------- -# Assembly / Project picker dialogs -# --------------------------------------------------------------------------- - - -def show_assembly_picker(client: SiloClient, parent=None) -> Optional[str]: - """Show a dialog to pick an assembly by PN. Returns the PN or None.""" - pn = _input_box("Pull BOM", "Assembly part number (e.g. A01-0003):") - return pn if pn and pn.strip() else None - - -def show_project_picker(client: SiloClient, parent=None) -> Optional[str]: - """Show a dialog to pick a project code. Returns the code or None.""" - try: - projects = client.get_projects() - except RuntimeError: - projects = [] - - if not projects: - code = _input_box("Pull Project", "Project code:") - return code if code and code.strip() else None - - # Build a choice list - choices = [f"{p.get('code', '')} - {p.get('name', '')}" for p in projects] - # For simplicity, use an input box with hint. A proper list picker - # would use a ListBox control, but this is functional for now. - hint = "Available: " + ", ".join(p.get("code", "") for p in projects) - code = _input_box("Pull Project", f"Project code ({hint}):") - return code if code and code.strip() else None diff --git a/pkg/calc/pythonpath/silo_calc/project_files.py b/pkg/calc/pythonpath/silo_calc/project_files.py deleted file mode 100644 index 97bfdce..0000000 --- a/pkg/calc/pythonpath/silo_calc/project_files.py +++ /dev/null @@ -1,76 +0,0 @@ -"""Local project file management for ODS workbooks. - -Mirrors the FreeCAD file path pattern from ``pkg/freecad/silo_commands.py``. -Project ODS files live at:: - - ~/projects/sheets/{PROJECT_CODE}/{PROJECT_CODE}.ods - -The ``SILO_PROJECTS_DIR`` env var (shared with the FreeCAD workbench) -controls the base directory. -""" - -import os -from pathlib import Path -from typing import Optional - -from . import settings as _settings - - -def get_sheets_dir() -> Path: - """Return the base directory for ODS project sheets.""" - return _settings.get_projects_dir() / "sheets" - - -def get_project_sheet_path(project_code: str) -> Path: - """Canonical path for a project workbook. - - Example: ``~/projects/sheets/3DX10/3DX10.ods`` - """ - return get_sheets_dir() / project_code / f"{project_code}.ods" - - -def ensure_project_dir(project_code: str) -> Path: - """Create the project sheet directory if needed and return its path.""" - d = get_sheets_dir() / project_code - d.mkdir(parents=True, exist_ok=True) - return d - - -def project_sheet_exists(project_code: str) -> bool: - """Check whether a project workbook already exists locally.""" - return get_project_sheet_path(project_code).is_file() - - -def save_project_sheet(project_code: str, ods_bytes: bytes) -> Path: - """Write ODS bytes to the canonical project path. - - Returns the Path written to. - """ - ensure_project_dir(project_code) - path = get_project_sheet_path(project_code) - with open(path, "wb") as f: - f.write(ods_bytes) - return path - - -def read_project_sheet(project_code: str) -> Optional[bytes]: - """Read ODS bytes from the canonical project path, or None.""" - path = get_project_sheet_path(project_code) - if not path.is_file(): - return None - with open(path, "rb") as f: - return f.read() - - -def list_project_sheets() -> list: - """Return a list of (project_code, path) tuples for all local sheets.""" - sheets_dir = get_sheets_dir() - results = [] - if not sheets_dir.is_dir(): - return results - for entry in sorted(sheets_dir.iterdir()): - if entry.is_dir(): - ods = entry / f"{entry.name}.ods" - if ods.is_file(): - results.append((entry.name, ods)) - return results diff --git a/pkg/calc/pythonpath/silo_calc/pull.py b/pkg/calc/pythonpath/silo_calc/pull.py deleted file mode 100644 index aa5bdc5..0000000 --- a/pkg/calc/pythonpath/silo_calc/pull.py +++ /dev/null @@ -1,542 +0,0 @@ -"""Pull commands -- populate LibreOffice Calc sheets from Silo API data. - -This module handles the UNO cell-level work for SiloPullBOM and -SiloPullProject. It fetches data via the SiloClient, then writes -cells with proper formatting, formulas, hidden columns, and row -hash tracking. -""" - -from typing import Any, Dict, List, Optional - -from . import sheet_format as sf -from . import sync_engine -from .client import SiloClient - -# UNO imports -- only available inside LibreOffice -try: - import uno - from com.sun.star.beans import PropertyValue - from com.sun.star.table import CellHoriJustify - - _HAS_UNO = True -except ImportError: - _HAS_UNO = False - -# --------------------------------------------------------------------------- -# Colour helpers (UNO uses 0xRRGGBB integers) -# --------------------------------------------------------------------------- - - -def _rgb_int(r: int, g: int, b: int) -> int: - return (r << 16) | (g << 8) | b - - -_HEADER_BG = _rgb_int(68, 114, 196) # steel blue -_HEADER_FG = _rgb_int(255, 255, 255) # white text - -_STATUS_COLORS = {k: _rgb_int(*v) for k, v in sf.STATUS_COLORS.items()} - -# --------------------------------------------------------------------------- -# Cell writing helpers -# --------------------------------------------------------------------------- - - -def _set_cell_string(sheet, col: int, row: int, value: str): - cell = sheet.getCellByPosition(col, row) - cell.setString(str(value) if value else "") - - -def _set_cell_float(sheet, col: int, row: int, value, fmt: str = ""): - cell = sheet.getCellByPosition(col, row) - try: - cell.setValue(float(value)) - except (ValueError, TypeError): - cell.setString(str(value) if value else "") - - -def _set_cell_formula(sheet, col: int, row: int, formula: str): - cell = sheet.getCellByPosition(col, row) - cell.setFormula(formula) - - -def _set_row_bg(sheet, row: int, col_count: int, color: int): - """Set background colour on an entire row.""" - rng = sheet.getCellRangeByPosition(0, row, col_count - 1, row) - rng.CellBackColor = color - - -def _format_header_row(sheet, col_count: int): - """Bold white text on blue background for row 0.""" - rng = sheet.getCellRangeByPosition(0, 0, col_count - 1, 0) - rng.CellBackColor = _HEADER_BG - rng.CharColor = _HEADER_FG - rng.CharWeight = 150 # com.sun.star.awt.FontWeight.BOLD - - -def _freeze_row(doc, row: int = 1): - """Freeze panes at the given row (default: freeze header).""" - ctrl = doc.getCurrentController() - ctrl.freezeAtPosition(0, row) - - -def _hide_columns(sheet, start_col: int, end_col: int): - """Hide a range of columns (inclusive).""" - cols = sheet.getColumns() - for i in range(start_col, end_col): - col = cols.getByIndex(i) - col.IsVisible = False - - -def _set_column_width(sheet, col: int, width_mm100: int): - """Set column width in 1/100 mm.""" - cols = sheet.getColumns() - c = cols.getByIndex(col) - c.Width = width_mm100 - - -# --------------------------------------------------------------------------- -# BOM data helpers -# --------------------------------------------------------------------------- - - -def _get_meta(entry: Dict, key: str, default: str = "") -> str: - """Extract a value from a BOM entry's metadata dict.""" - meta = entry.get("metadata") or {} - val = meta.get(key, default) - return str(val) if val else default - - -def _get_meta_float(entry: Dict, key: str) -> Optional[float]: - meta = entry.get("metadata") or {} - val = meta.get(key) - if val is not None: - try: - return float(val) - except (ValueError, TypeError): - pass - return None - - -def _get_property(rev: Optional[Dict], key: str) -> str: - """Extract a property from a revision's properties dict.""" - if not rev: - return "" - props = rev.get("properties") or {} - val = props.get(key, "") - return str(val) if val else "" - - -# --------------------------------------------------------------------------- -# SiloPullBOM -# --------------------------------------------------------------------------- - - -def pull_bom( - client: SiloClient, - doc, - sheet, - assembly_pn: str, - project_code: str = "", - schema: str = "kindred-rd", -): - """Fetch an expanded BOM and populate *sheet* with formatted data. - - Parameters - ---------- - client : SiloClient - doc : XSpreadsheetDocument - sheet : XSpreadsheet (the target sheet to populate) - assembly_pn : str (top-level assembly part number) - project_code : str (project code for auto-tagging, optional) - schema : str - """ - if not _HAS_UNO: - raise RuntimeError("UNO API not available -- must run inside LibreOffice") - - # Fetch expanded BOM - bom_entries = client.get_bom_expanded(assembly_pn, depth=10) - if not bom_entries: - raise RuntimeError(f"No BOM entries found for {assembly_pn}") - - # Fetch the top-level item for the assembly name - try: - top_item = client.get_item(assembly_pn) - except RuntimeError: - top_item = {} - - # Build a cache of items and their latest revisions for property lookup - item_cache: Dict[str, Dict] = {} - rev_cache: Dict[str, Dict] = {} - - def _ensure_cached(pn: str): - if pn in item_cache: - return - try: - item_cache[pn] = client.get_item(pn) - except RuntimeError: - item_cache[pn] = {} - try: - revisions = client.get_revisions(pn) - if revisions: - rev_cache[pn] = revisions[0] # newest first - except RuntimeError: - pass - - # Pre-cache all items in the BOM - all_pns = set() - for e in bom_entries: - all_pns.add(e.get("child_part_number", "")) - all_pns.add(e.get("parent_part_number", "")) - all_pns.discard("") - for pn in all_pns: - _ensure_cached(pn) - - # -- Write header row --------------------------------------------------- - for col_idx, header in enumerate(sf.BOM_ALL_HEADERS): - _set_cell_string(sheet, col_idx, 0, header) - _format_header_row(sheet, sf.BOM_TOTAL_COLS) - - # -- Group entries by parent for section headers ------------------------ - # BOM entries come back in tree order (parent then children). - # We insert section header rows for each depth-1 sub-assembly. - - row = 1 # current write row (0 is header) - prev_parent = None - - for entry in bom_entries: - depth = entry.get("depth", 0) - child_pn = entry.get("child_part_number", "") - parent_pn = entry.get("parent_part_number", "") - child_item = item_cache.get(child_pn, {}) - child_rev = rev_cache.get(child_pn) - - # Section header: when the parent changes for depth >= 1 entries - if depth == 1 and parent_pn != prev_parent and parent_pn: - if row > 1: - # Blank separator row - row += 1 - # Sub-assembly label row - parent_item = item_cache.get(parent_pn, {}) - label = parent_item.get("description", parent_pn) - _set_cell_string(sheet, sf.COL_ITEM, row, label) - _set_cell_float(sheet, sf.COL_LEVEL, row, 0) - _set_cell_string(sheet, sf.COL_SOURCE, row, "M") - _set_cell_string(sheet, sf.COL_PN, row, parent_pn) - - # Compute sub-assembly cost from children if available - parent_cost = _compute_subassembly_cost(bom_entries, parent_pn, item_cache) - if parent_cost is not None: - _set_cell_float(sheet, sf.COL_UNIT_COST, row, parent_cost) - _set_cell_float(sheet, sf.COL_QTY, row, 1) - # Ext Cost formula - ext_formula = f"={sf.col_letter(sf.COL_UNIT_COST)}{row + 1}*{sf.col_letter(sf.COL_QTY)}{row + 1}" - _set_cell_formula(sheet, sf.COL_EXT_COST, row, ext_formula) - _set_cell_string(sheet, sf.COL_SCHEMA, row, schema) - - # Sync tracking for parent row - parent_cells = [""] * sf.BOM_TOTAL_COLS - parent_cells[sf.COL_ITEM] = label - parent_cells[sf.COL_LEVEL] = "0" - parent_cells[sf.COL_SOURCE] = "M" - parent_cells[sf.COL_PN] = parent_pn - parent_cells[sf.COL_SCHEMA] = schema - sync_engine.update_row_sync_state( - parent_cells, - sync_engine.STATUS_SYNCED, - updated_at=parent_item.get("updated_at", ""), - parent_pn="", - ) - _set_cell_string(sheet, sf.COL_ROW_HASH, row, parent_cells[sf.COL_ROW_HASH]) - _set_cell_string( - sheet, sf.COL_ROW_STATUS, row, parent_cells[sf.COL_ROW_STATUS] - ) - _set_cell_string( - sheet, sf.COL_UPDATED_AT, row, parent_cells[sf.COL_UPDATED_AT] - ) - - _set_row_bg(sheet, row, sf.BOM_TOTAL_COLS, _STATUS_COLORS["synced"]) - prev_parent = parent_pn - row += 1 - - # -- Write child row ----------------------------------------------- - quantity = entry.get("quantity") - unit_cost = _get_meta_float(entry, "unit_cost") - if unit_cost is None: - unit_cost = child_item.get("standard_cost") - - # Item column: blank for children (name is in the section header) - _set_cell_string(sheet, sf.COL_ITEM, row, "") - _set_cell_float(sheet, sf.COL_LEVEL, row, depth) - _set_cell_string(sheet, sf.COL_SOURCE, row, child_item.get("sourcing_type", "")) - _set_cell_string(sheet, sf.COL_PN, row, child_pn) - _set_cell_string( - sheet, sf.COL_DESCRIPTION, row, child_item.get("description", "") - ) - _set_cell_string( - sheet, sf.COL_SELLER_DESC, row, _get_meta(entry, "seller_description") - ) - - if unit_cost is not None: - _set_cell_float(sheet, sf.COL_UNIT_COST, row, unit_cost) - if quantity is not None: - _set_cell_float(sheet, sf.COL_QTY, row, quantity) - - # Ext Cost formula - ext_formula = f"={sf.col_letter(sf.COL_UNIT_COST)}{row + 1}*{sf.col_letter(sf.COL_QTY)}{row + 1}" - _set_cell_formula(sheet, sf.COL_EXT_COST, row, ext_formula) - - _set_cell_string( - sheet, sf.COL_SOURCING_LINK, row, child_item.get("sourcing_link", "") - ) - _set_cell_string(sheet, sf.COL_SCHEMA, row, schema) - - # -- Property columns ----------------------------------------------- - prop_values = _build_property_cells(child_item, child_rev, entry) - for i, val in enumerate(prop_values): - if val: - _set_cell_string(sheet, sf.COL_PROP_START + i, row, val) - - # -- Sync tracking --------------------------------------------------- - row_cells = [""] * sf.BOM_TOTAL_COLS - row_cells[sf.COL_LEVEL] = str(depth) - row_cells[sf.COL_SOURCE] = child_item.get("sourcing_type", "") - row_cells[sf.COL_PN] = child_pn - row_cells[sf.COL_DESCRIPTION] = child_item.get("description", "") - row_cells[sf.COL_SELLER_DESC] = _get_meta(entry, "seller_description") - row_cells[sf.COL_UNIT_COST] = str(unit_cost) if unit_cost else "" - row_cells[sf.COL_QTY] = str(quantity) if quantity else "" - row_cells[sf.COL_SOURCING_LINK] = child_item.get("sourcing_link", "") - row_cells[sf.COL_SCHEMA] = schema - for i, val in enumerate(prop_values): - row_cells[sf.COL_PROP_START + i] = val - - sync_engine.update_row_sync_state( - row_cells, - sync_engine.STATUS_SYNCED, - updated_at=child_item.get("updated_at", ""), - parent_pn=parent_pn, - ) - _set_cell_string(sheet, sf.COL_ROW_HASH, row, row_cells[sf.COL_ROW_HASH]) - _set_cell_string(sheet, sf.COL_ROW_STATUS, row, row_cells[sf.COL_ROW_STATUS]) - _set_cell_string(sheet, sf.COL_UPDATED_AT, row, row_cells[sf.COL_UPDATED_AT]) - _set_cell_string(sheet, sf.COL_PARENT_PN, row, row_cells[sf.COL_PARENT_PN]) - - _set_row_bg(sheet, row, sf.BOM_TOTAL_COLS, _STATUS_COLORS["synced"]) - row += 1 - - # -- Formatting --------------------------------------------------------- - _freeze_row(doc, 1) - _hide_columns(sheet, sf.COL_PROP_START, sf.COL_PROP_END) # property cols - _hide_columns(sheet, sf.COL_SYNC_START, sf.BOM_TOTAL_COLS) # sync cols - - # Set reasonable column widths for visible columns (in 1/100 mm) - _WIDTHS = { - sf.COL_ITEM: 4500, - sf.COL_LEVEL: 1200, - sf.COL_SOURCE: 1500, - sf.COL_PN: 2500, - sf.COL_DESCRIPTION: 5000, - sf.COL_SELLER_DESC: 6000, - sf.COL_UNIT_COST: 2200, - sf.COL_QTY: 1200, - sf.COL_EXT_COST: 2200, - sf.COL_SOURCING_LINK: 5000, - sf.COL_SCHEMA: 1500, - } - for col, width in _WIDTHS.items(): - _set_column_width(sheet, col, width) - - # Auto-tag all items with the project (if a project code is set) - if project_code: - _auto_tag_project(client, all_pns, project_code) - - return row - 1 # number of data rows written - - -def _compute_subassembly_cost( - bom_entries: List[Dict], - parent_pn: str, - item_cache: Dict[str, Dict], -) -> Optional[float]: - """Sum unit_cost * quantity for direct children of parent_pn.""" - total = 0.0 - found = False - for e in bom_entries: - if e.get("parent_part_number") == parent_pn and e.get("depth", 0) > 0: - q = e.get("quantity") or 0 - uc = _get_meta_float(e, "unit_cost") - if uc is None: - child = item_cache.get(e.get("child_part_number", ""), {}) - uc = child.get("standard_cost") - if uc is not None: - total += float(uc) * float(q) - found = True - return total if found else None - - -def _build_property_cells( - item: Dict, rev: Optional[Dict], bom_entry: Dict -) -> List[str]: - """Build the property column values in order matching BOM_PROPERTY_HEADERS. - - Sources (priority): revision properties > BOM metadata > item fields. - """ - result = [] - for header in sf.BOM_PROPERTY_HEADERS: - db_key = sf.PROPERTY_KEY_MAP.get(header, "") - val = "" - # Check revision properties first - if db_key: - val = _get_property(rev, db_key) - # Fallback to BOM entry metadata - if not val and db_key: - val = _get_meta(bom_entry, db_key) - # Special case: Long Description from item field - if header == "Long Description" and not val: - val = item.get("long_description", "") - # Special case: Notes from item metadata or revision - if header == "Notes" and not val: - val = _get_meta(bom_entry, "notes") - result.append(str(val) if val else "") - return result - - -def _auto_tag_project( - client: SiloClient, - part_numbers: set, - project_code: str, -): - """Tag all part numbers with the given project code (skip failures).""" - for pn in part_numbers: - if not pn: - continue - try: - existing = client.get_item_projects(pn) - existing_codes = ( - {p.get("code", "") for p in existing} - if isinstance(existing, list) - else set() - ) - if project_code not in existing_codes: - client.add_item_projects(pn, [project_code]) - except RuntimeError: - pass # Best-effort tagging - - -# --------------------------------------------------------------------------- -# SiloPullProject -# --------------------------------------------------------------------------- - - -def pull_project( - client: SiloClient, - doc, - project_code: str, - schema: str = "kindred-rd", -): - """Fetch project items and populate an Items sheet. - - Also attempts to find an assembly and populate a BOM sheet. - """ - if not _HAS_UNO: - raise RuntimeError("UNO API not available") - - items = client.get_project_items(project_code) - if not items: - raise RuntimeError(f"No items found for project {project_code}") - - sheets = doc.getSheets() - - # -- Items sheet -------------------------------------------------------- - if sheets.hasByName("Items"): - items_sheet = sheets.getByName("Items") - else: - sheets.insertNewByName("Items", sheets.getCount()) - items_sheet = sheets.getByName("Items") - - # Header - for col_idx, header in enumerate(sf.ITEMS_HEADERS): - _set_cell_string(items_sheet, col_idx, 0, header) - header_range = items_sheet.getCellRangeByPosition( - 0, 0, len(sf.ITEMS_HEADERS) - 1, 0 - ) - header_range.CellBackColor = _HEADER_BG - header_range.CharColor = _HEADER_FG - header_range.CharWeight = 150 - - for row_idx, item in enumerate(items, start=1): - _set_cell_string(items_sheet, 0, row_idx, item.get("part_number", "")) - _set_cell_string(items_sheet, 1, row_idx, item.get("description", "")) - _set_cell_string(items_sheet, 2, row_idx, item.get("item_type", "")) - _set_cell_string(items_sheet, 3, row_idx, item.get("sourcing_type", "")) - _set_cell_string(items_sheet, 4, row_idx, schema) - cost = item.get("standard_cost") - if cost is not None: - _set_cell_float(items_sheet, 5, row_idx, cost) - _set_cell_string(items_sheet, 6, row_idx, item.get("sourcing_link", "")) - _set_cell_string(items_sheet, 7, row_idx, item.get("long_description", "")) - - # Properties from latest revision (if available) - rev = None - try: - revisions = client.get_revisions(item.get("part_number", "")) - if revisions: - rev = revisions[0] - except RuntimeError: - pass - - prop_cols = [ - "manufacturer", - "manufacturer_pn", - "supplier", - "supplier_pn", - "lead_time_days", - "minimum_order_qty", - "lifecycle_status", - "rohs_compliant", - "country_of_origin", - "material", - "finish", - "notes", - ] - for pi, prop_key in enumerate(prop_cols): - val = _get_property(rev, prop_key) - if val: - _set_cell_string(items_sheet, 8 + pi, row_idx, val) - - _set_cell_string( - items_sheet, - 20, - row_idx, - item.get("created_at", "")[:10] if item.get("created_at") else "", - ) - _set_cell_string( - items_sheet, - 21, - row_idx, - item.get("updated_at", "")[:10] if item.get("updated_at") else "", - ) - - # Freeze header - _freeze_row(doc, 1) - - # -- BOM sheet (if we can find an assembly) ----------------------------- - assemblies = [i for i in items if i.get("item_type") == "assembly"] - if assemblies: - top_assembly = assemblies[0] - top_pn = top_assembly.get("part_number", "") - - if sheets.hasByName("BOM"): - bom_sheet = sheets.getByName("BOM") - else: - sheets.insertNewByName("BOM", 0) - bom_sheet = sheets.getByName("BOM") - - try: - pull_bom( - client, doc, bom_sheet, top_pn, project_code=project_code, schema=schema - ) - except RuntimeError: - pass # BOM sheet stays empty if fetch fails - - return len(items) diff --git a/pkg/calc/pythonpath/silo_calc/push.py b/pkg/calc/pythonpath/silo_calc/push.py deleted file mode 100644 index fb1dc43..0000000 --- a/pkg/calc/pythonpath/silo_calc/push.py +++ /dev/null @@ -1,431 +0,0 @@ -"""Push command -- sync local BOM edits back to the Silo database. - -Handles: -- Row classification (new / modified / synced / conflict) -- Creating new items via the API -- Updating existing items and BOM entry metadata -- Auto-tagging new items with the project code -- Conflict detection against server timestamps -- Updating row sync state after successful push -""" - -from typing import Any, Dict, List, Optional, Tuple - -from . import sheet_format as sf -from . import sync_engine -from .client import SiloClient - -# UNO imports -try: - import uno - - _HAS_UNO = True -except ImportError: - _HAS_UNO = False - - -def _read_sheet_rows(sheet) -> List[List[str]]: - """Read all rows from a sheet as lists of strings.""" - cursor = sheet.createCursor() - cursor.gotoStartOfUsedArea(False) - cursor.gotoEndOfUsedArea(True) - addr = cursor.getRangeAddress() - end_row = addr.EndRow - end_col = max(addr.EndColumn, sf.BOM_TOTAL_COLS - 1) - - rows = [] - for r in range(end_row + 1): - row_cells = [] - for c in range(end_col + 1): - cell = sheet.getCellByPosition(c, r) - # Get display string for all cell types - val = cell.getString() - row_cells.append(val) - # Pad to full width - while len(row_cells) < sf.BOM_TOTAL_COLS: - row_cells.append("") - rows.append(row_cells) - return rows - - -def _detect_project_code(doc) -> str: - """Try to detect the project code from the file path.""" - try: - file_url = doc.getURL() - if file_url: - file_path = uno.fileUrlToSystemPath(file_url) - parts = file_path.replace("\\", "/").split("/") - if "sheets" in parts: - idx = parts.index("sheets") - if idx + 1 < len(parts): - return parts[idx + 1] - except Exception: - pass - return "" - - -def _fetch_server_timestamps( - client: SiloClient, part_numbers: List[str] -) -> Dict[str, str]: - """Fetch updated_at timestamps for a list of part numbers.""" - timestamps = {} - for pn in part_numbers: - if not pn: - continue - try: - item = client.get_item(pn) - timestamps[pn] = item.get("updated_at", "") - except RuntimeError: - pass - return timestamps - - -# --------------------------------------------------------------------------- -# Push execution -# --------------------------------------------------------------------------- - - -def push_sheet( - client: SiloClient, - doc, - sheet, - schema: str = "kindred-rd", -) -> Dict[str, Any]: - """Execute a push for the active BOM sheet. - - Returns a summary dict with counts and any errors. - """ - if not _HAS_UNO: - raise RuntimeError("UNO API not available") - - rows = _read_sheet_rows(sheet) - if not rows: - return {"created": 0, "updated": 0, "errors": [], "skipped": 0} - - project_code = _detect_project_code(doc) - - # Classify all rows - classified = sync_engine.classify_rows(rows) - - # Collect part numbers for server timestamp check - modified_pns = [ - cells[sf.COL_PN].strip() - for _, status, cells in classified - if status == sync_engine.STATUS_MODIFIED and cells[sf.COL_PN].strip() - ] - server_ts = _fetch_server_timestamps(client, modified_pns) - - # Build diff - diff = sync_engine.build_push_diff(classified, server_timestamps=server_ts) - - results = { - "created": 0, - "updated": 0, - "errors": [], - "skipped": diff["unchanged"], - "conflicts": len(diff["conflicts"]), - } - - # -- Handle new rows: create items in the database ---------------------- - for row_info in diff["new"]: - row_idx = row_info["row_index"] - cells = rows[row_idx] - pn = cells[sf.COL_PN].strip() - desc = cells[sf.COL_DESCRIPTION].strip() - source = cells[sf.COL_SOURCE].strip() - sourcing_link = cells[sf.COL_SOURCING_LINK].strip() - unit_cost_str = cells[sf.COL_UNIT_COST].strip() - qty_str = cells[sf.COL_QTY].strip() - parent_pn = ( - cells[sf.COL_PARENT_PN].strip() if len(cells) > sf.COL_PARENT_PN else "" - ) - - unit_cost = None - if unit_cost_str: - try: - unit_cost = float(unit_cost_str.replace("$", "").replace(",", "")) - except ValueError: - pass - - qty = 1.0 - if qty_str: - try: - qty = float(qty_str) - except ValueError: - pass - - if not desc: - results["errors"].append( - f"Row {row_idx + 1}: description is required for new items" - ) - _set_row_status(sheet, row_idx, sync_engine.STATUS_ERROR) - continue - - try: - if pn: - # Check if item already exists - try: - existing = client.get_item(pn) - # Item exists -- just update BOM relationship if parent is known - if parent_pn: - _update_bom_relationship( - client, parent_pn, pn, qty, unit_cost, cells - ) - results["updated"] += 1 - _update_row_after_push(sheet, rows, row_idx, existing) - continue - except RuntimeError: - pass # Item doesn't exist, create it - - # Detect category from PN prefix (e.g., F01-0001 -> F01) - category = pn[:3] if pn and len(pn) >= 3 else "" - - # Create the item - create_data = { - "schema": schema, - "category": category, - "description": desc, - } - if source: - create_data["sourcing_type"] = source - if sourcing_link: - create_data["sourcing_link"] = sourcing_link - if unit_cost is not None: - create_data["standard_cost"] = unit_cost - if project_code: - create_data["projects"] = [project_code] - - created = client.create_item(**create_data) - created_pn = created.get("part_number", pn) - - # Update the PN cell if it was auto-generated - if not pn and created_pn: - from . import pull as _pull - - _pull._set_cell_string(sheet, sf.COL_PN, row_idx, created_pn) - cells[sf.COL_PN] = created_pn - - # Add to parent's BOM if parent is known - if parent_pn: - _update_bom_relationship( - client, parent_pn, created_pn, qty, unit_cost, cells - ) - - # Auto-tag with project - if project_code: - try: - client.add_item_projects(created_pn, [project_code]) - except RuntimeError: - pass - - # Set property columns via revision update (if any properties set) - _push_properties(client, created_pn, cells) - - results["created"] += 1 - _update_row_after_push(sheet, rows, row_idx, created) - - except RuntimeError as e: - results["errors"].append(f"Row {row_idx + 1} ({pn}): {e}") - _set_row_status(sheet, row_idx, sync_engine.STATUS_ERROR) - - # -- Handle modified rows: update items --------------------------------- - for row_info in diff["modified"]: - row_idx = row_info["row_index"] - cells = rows[row_idx] - pn = cells[sf.COL_PN].strip() - parent_pn = ( - cells[sf.COL_PARENT_PN].strip() if len(cells) > sf.COL_PARENT_PN else "" - ) - - if not pn: - results["errors"].append( - f"Row {row_idx + 1}: no part number for modified row" - ) - continue - - try: - # Update item fields - update_fields = {} - desc = cells[sf.COL_DESCRIPTION].strip() - if desc: - update_fields["description"] = desc - source = cells[sf.COL_SOURCE].strip() - if source: - update_fields["sourcing_type"] = source - sourcing_link = cells[sf.COL_SOURCING_LINK].strip() - update_fields["sourcing_link"] = sourcing_link - - unit_cost_str = cells[sf.COL_UNIT_COST].strip() - unit_cost = None - if unit_cost_str: - try: - unit_cost = float(unit_cost_str.replace("$", "").replace(",", "")) - update_fields["standard_cost"] = unit_cost - except ValueError: - pass - - if update_fields: - updated = client.update_item(pn, **update_fields) - else: - updated = client.get_item(pn) - - # Update BOM relationship - qty_str = cells[sf.COL_QTY].strip() - qty = 1.0 - if qty_str: - try: - qty = float(qty_str) - except ValueError: - pass - - if parent_pn: - _update_bom_relationship(client, parent_pn, pn, qty, unit_cost, cells) - - # Update properties - _push_properties(client, pn, cells) - - # Auto-tag with project - if project_code: - try: - existing_projects = client.get_item_projects(pn) - existing_codes = ( - {p.get("code", "") for p in existing_projects} - if isinstance(existing_projects, list) - else set() - ) - if project_code not in existing_codes: - client.add_item_projects(pn, [project_code]) - except RuntimeError: - pass - - results["updated"] += 1 - _update_row_after_push(sheet, rows, row_idx, updated) - - except RuntimeError as e: - results["errors"].append(f"Row {row_idx + 1} ({pn}): {e}") - _set_row_status(sheet, row_idx, sync_engine.STATUS_ERROR) - - # -- Mark conflicts ----------------------------------------------------- - for row_info in diff["conflicts"]: - row_idx = row_info["row_index"] - _set_row_status(sheet, row_idx, sync_engine.STATUS_CONFLICT) - - return results - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - - -def _update_bom_relationship( - client: SiloClient, - parent_pn: str, - child_pn: str, - qty: float, - unit_cost: Optional[float], - cells: List[str], -): - """Create or update a BOM relationship between parent and child.""" - metadata = {} - seller_desc = ( - cells[sf.COL_SELLER_DESC].strip() if len(cells) > sf.COL_SELLER_DESC else "" - ) - if seller_desc: - metadata["seller_description"] = seller_desc - if unit_cost is not None: - metadata["unit_cost"] = unit_cost - sourcing_link = ( - cells[sf.COL_SOURCING_LINK].strip() if len(cells) > sf.COL_SOURCING_LINK else "" - ) - if sourcing_link: - metadata["sourcing_link"] = sourcing_link - - try: - # Try update first (entry may already exist) - client.update_bom_entry( - parent_pn, - child_pn, - quantity=qty, - metadata=metadata if metadata else None, - ) - except RuntimeError: - # If update fails, try creating - try: - client.add_bom_entry( - parent_pn, - child_pn, - quantity=qty, - metadata=metadata if metadata else None, - ) - except RuntimeError: - pass # Best effort - - -def _push_properties(client: SiloClient, pn: str, cells: List[str]): - """Push property column values to the item's latest revision. - - Currently this is best-effort -- the API may not support bulk property - updates in a single call. Properties are stored in revision.properties - JSONB on the server side. - """ - # Collect property values from the row - properties = {} - for i, header in enumerate(sf.BOM_PROPERTY_HEADERS): - col_idx = sf.COL_PROP_START + i - if col_idx < len(cells): - val = cells[col_idx].strip() - if val: - db_key = sf.PROPERTY_KEY_MAP.get(header, "") - if db_key: - properties[db_key] = val - - if not properties: - return - - # The Silo API stores properties on revisions. For now, we'll update - # the item's long_description if it's set, and rely on the revision - # properties being set during create or via revision update. - long_desc = properties.pop("long_description", None) - if long_desc: - try: - client.update_item(pn, long_description=long_desc) - except RuntimeError: - pass - - -def _update_row_after_push( - sheet, rows: List[List[str]], row_idx: int, item: Dict[str, Any] -): - """Update sync tracking columns after a successful push.""" - from . import pull as _pull - - cells = rows[row_idx] - - # Update the PN if the server returned one (auto-generated) - server_pn = item.get("part_number", "") - if server_pn and not cells[sf.COL_PN].strip(): - cells[sf.COL_PN] = server_pn - _pull._set_cell_string(sheet, sf.COL_PN, row_idx, server_pn) - - # Recompute hash and set synced status - sync_engine.update_row_sync_state( - cells, - sync_engine.STATUS_SYNCED, - updated_at=item.get("updated_at", ""), - ) - _pull._set_cell_string(sheet, sf.COL_ROW_HASH, row_idx, cells[sf.COL_ROW_HASH]) - _pull._set_cell_string(sheet, sf.COL_ROW_STATUS, row_idx, cells[sf.COL_ROW_STATUS]) - _pull._set_cell_string(sheet, sf.COL_UPDATED_AT, row_idx, cells[sf.COL_UPDATED_AT]) - - _pull._set_row_bg(sheet, row_idx, sf.BOM_TOTAL_COLS, _pull._STATUS_COLORS["synced"]) - - -def _set_row_status(sheet, row_idx: int, status: str): - """Set just the status cell and row colour.""" - from . import pull as _pull - - _pull._set_cell_string(sheet, sf.COL_ROW_STATUS, row_idx, status) - color = _pull._STATUS_COLORS.get(status) - if color: - _pull._set_row_bg(sheet, row_idx, sf.BOM_TOTAL_COLS, color) diff --git a/pkg/calc/pythonpath/silo_calc/settings.py b/pkg/calc/pythonpath/silo_calc/settings.py deleted file mode 100644 index ce8438f..0000000 --- a/pkg/calc/pythonpath/silo_calc/settings.py +++ /dev/null @@ -1,94 +0,0 @@ -"""Persistent settings for the Silo Calc extension. - -Settings are stored in ``~/.config/silo/calc-settings.json``. -The file is a flat JSON dict with known keys. -""" - -import json -import os -from pathlib import Path -from typing import Any, Dict - -_SETTINGS_DIR = ( - Path(os.environ.get("XDG_CONFIG_HOME", "~/.config")).expanduser() / "silo" -) -_SETTINGS_FILE = _SETTINGS_DIR / "calc-settings.json" - -# Default values for every known key. -_DEFAULTS: Dict[str, Any] = { - "api_url": "", - "api_token": "", - "ssl_verify": True, - "ssl_cert_path": "", - "auth_username": "", - "auth_role": "", - "auth_source": "", - "projects_dir": "", # fallback: SILO_PROJECTS_DIR env or ~/projects - "default_schema": "kindred-rd", - "openrouter_api_key": "", # fallback: OPENROUTER_API_KEY env var - "openrouter_model": "", # fallback: ai_client.DEFAULT_MODEL - "openrouter_instructions": "", # fallback: ai_client.DEFAULT_INSTRUCTIONS -} - - -def load() -> Dict[str, Any]: - """Load settings, returning defaults for any missing keys.""" - cfg = dict(_DEFAULTS) - if _SETTINGS_FILE.is_file(): - try: - with open(_SETTINGS_FILE, "r") as f: - stored = json.load(f) - cfg.update(stored) - except (json.JSONDecodeError, OSError): - pass - return cfg - - -def save(cfg: Dict[str, Any]) -> None: - """Persist the full settings dict to disk.""" - _SETTINGS_DIR.mkdir(parents=True, exist_ok=True) - with open(_SETTINGS_FILE, "w") as f: - json.dump(cfg, f, indent=2) - - -def get(key: str, default: Any = None) -> Any: - """Convenience: load a single key.""" - cfg = load() - return cfg.get(key, default) - - -def put(key: str, value: Any) -> None: - """Convenience: update a single key and persist.""" - cfg = load() - cfg[key] = value - save(cfg) - - -def save_auth(username: str, role: str = "", source: str = "", token: str = "") -> None: - """Store authentication info.""" - cfg = load() - cfg["auth_username"] = username - cfg["auth_role"] = role - cfg["auth_source"] = source - if token: - cfg["api_token"] = token - save(cfg) - - -def clear_auth() -> None: - """Remove stored auth credentials.""" - cfg = load() - cfg["api_token"] = "" - cfg["auth_username"] = "" - cfg["auth_role"] = "" - cfg["auth_source"] = "" - save(cfg) - - -def get_projects_dir() -> Path: - """Return the resolved projects base directory.""" - cfg = load() - d = cfg.get("projects_dir", "") - if not d: - d = os.environ.get("SILO_PROJECTS_DIR", "~/projects") - return Path(d).expanduser() diff --git a/pkg/calc/pythonpath/silo_calc/sheet_format.py b/pkg/calc/pythonpath/silo_calc/sheet_format.py deleted file mode 100644 index 370745b..0000000 --- a/pkg/calc/pythonpath/silo_calc/sheet_format.py +++ /dev/null @@ -1,178 +0,0 @@ -"""BOM and Items sheet column layouts, constants, and detection helpers. - -This module defines the column structure that matches the engineer's working -BOM format. Hidden property columns and sync-tracking columns are appended -to the right. -""" - -from typing import Dict, List, Optional, Tuple - -# --------------------------------------------------------------------------- -# Column indices -- BOM sheet -# --------------------------------------------------------------------------- - -# Visible core columns (always shown) -BOM_VISIBLE_HEADERS: List[str] = [ - "Item", # A - assembly label / section header - "Level", # B - depth in expanded BOM - "Source", # C - sourcing_type (M/P) - "PN", # D - part_number - "Description", # E - item description - "Seller Description", # F - metadata.seller_description - "Unit Cost", # G - standard_cost / metadata.unit_cost - "QTY", # H - quantity on relationship - "Ext Cost", # I - formula =G*H - "Sourcing Link", # J - sourcing_link - "Schema", # K - schema name -] - -# Hidden property columns (collapsed group, available when needed) -BOM_PROPERTY_HEADERS: List[str] = [ - "Manufacturer", # L - "Manufacturer PN", # M - "Supplier", # N - "Supplier PN", # O - "Lead Time (days)", # P - "Min Order Qty", # Q - "Lifecycle Status", # R - "RoHS Compliant", # S - "Country of Origin", # T - "Material", # U - "Finish", # V - "Notes", # W - "Long Description", # X -] - -# Hidden sync columns (never shown to user) -BOM_SYNC_HEADERS: List[str] = [ - "_silo_row_hash", # Y - SHA256 of row data at pull time - "_silo_row_status", # Z - synced/modified/new/error - "_silo_updated_at", # AA - server timestamp - "_silo_parent_pn", # AB - parent assembly PN for this BOM entry -] - -# All headers in order -BOM_ALL_HEADERS: List[str] = ( - BOM_VISIBLE_HEADERS + BOM_PROPERTY_HEADERS + BOM_SYNC_HEADERS -) - -# Index constants for quick access -COL_ITEM = 0 -COL_LEVEL = 1 -COL_SOURCE = 2 -COL_PN = 3 -COL_DESCRIPTION = 4 -COL_SELLER_DESC = 5 -COL_UNIT_COST = 6 -COL_QTY = 7 -COL_EXT_COST = 8 -COL_SOURCING_LINK = 9 -COL_SCHEMA = 10 - -# Property column range -COL_PROP_START = len(BOM_VISIBLE_HEADERS) # 11 -COL_PROP_END = COL_PROP_START + len(BOM_PROPERTY_HEADERS) # 24 - -# Sync column range -COL_SYNC_START = COL_PROP_END # 24 -COL_ROW_HASH = COL_SYNC_START # 24 -COL_ROW_STATUS = COL_SYNC_START + 1 # 25 -COL_UPDATED_AT = COL_SYNC_START + 2 # 26 -COL_PARENT_PN = COL_SYNC_START + 3 # 27 - -# Total column count -BOM_TOTAL_COLS = len(BOM_ALL_HEADERS) - -# --------------------------------------------------------------------------- -# Items sheet columns (flat list of all items for a project) -# --------------------------------------------------------------------------- - -ITEMS_HEADERS: List[str] = [ - "PN", - "Description", - "Type", - "Source", - "Schema", - "Standard Cost", - "Sourcing Link", - "Long Description", - "Manufacturer", - "Manufacturer PN", - "Supplier", - "Supplier PN", - "Lead Time (days)", - "Min Order Qty", - "Lifecycle Status", - "RoHS Compliant", - "Country of Origin", - "Material", - "Finish", - "Notes", - "Created", - "Updated", -] - -# --------------------------------------------------------------------------- -# Property key mapping (header name -> DB field path) -# --------------------------------------------------------------------------- - -PROPERTY_KEY_MAP: Dict[str, str] = { - "Manufacturer": "manufacturer", - "Manufacturer PN": "manufacturer_pn", - "Supplier": "supplier", - "Supplier PN": "supplier_pn", - "Lead Time (days)": "lead_time_days", - "Min Order Qty": "minimum_order_qty", - "Lifecycle Status": "lifecycle_status", - "RoHS Compliant": "rohs_compliant", - "Country of Origin": "country_of_origin", - "Material": "material", - "Finish": "finish", - "Notes": "notes", -} - -# Reverse map -DB_FIELD_TO_HEADER: Dict[str, str] = {v: k for k, v in PROPERTY_KEY_MAP.items()} - -# --------------------------------------------------------------------------- -# Row status colours (RGB tuples, 0-255) -# --------------------------------------------------------------------------- - -STATUS_COLORS: Dict[str, Tuple[int, int, int]] = { - "synced": (198, 239, 206), # light green #C6EFCE - "modified": (255, 235, 156), # light yellow #FFEB9C - "new": (189, 215, 238), # light blue #BDD7EE - "error": (255, 199, 206), # light red #FFC7CE - "conflict": (244, 176, 132), # orange #F4B084 -} - -# --------------------------------------------------------------------------- -# Sheet type detection -# --------------------------------------------------------------------------- - - -def detect_sheet_type(headers: List[str]) -> Optional[str]: - """Detect sheet type from the first row of headers. - - Returns ``"bom"``, ``"items"``, or ``None`` if unrecognised. - """ - if not headers: - return None - # Normalise for comparison - norm = [h.strip().lower() for h in headers] - if "item" in norm and "level" in norm and "qty" in norm: - return "bom" - if "pn" in norm and "type" in norm: - return "items" - return None - - -def col_letter(index: int) -> str: - """Convert 0-based column index to spreadsheet letter (A, B, ..., AA, AB).""" - result = "" - while True: - result = chr(65 + index % 26) + result - index = index // 26 - 1 - if index < 0: - break - return result diff --git a/pkg/calc/pythonpath/silo_calc/sync_engine.py b/pkg/calc/pythonpath/silo_calc/sync_engine.py deleted file mode 100644 index e00a360..0000000 --- a/pkg/calc/pythonpath/silo_calc/sync_engine.py +++ /dev/null @@ -1,160 +0,0 @@ -"""Row hashing, diff classification, and sync state tracking. - -Used by push/pull commands to detect which rows have been modified locally -since the last pull, and to detect conflicts with server-side changes. -""" - -import hashlib -import json -from typing import Any, Dict, List, Optional, Tuple - -from . import sheet_format as sf - -# Row statuses -STATUS_SYNCED = "synced" -STATUS_MODIFIED = "modified" -STATUS_NEW = "new" -STATUS_ERROR = "error" -STATUS_CONFLICT = "conflict" - - -def compute_row_hash(cells: List[str]) -> str: - """SHA-256 hash of the visible + property columns of a row. - - Only the data columns are hashed (not sync tracking columns). - Blank/empty cells are normalised to the empty string. - """ - # Use columns 0..COL_PROP_END-1 (visible + properties, not sync cols) - data_cells = cells[: sf.COL_PROP_END] - # Normalise - normalised = [str(c).strip() if c else "" for c in data_cells] - raw = "\t".join(normalised).encode("utf-8") - return hashlib.sha256(raw).hexdigest() - - -def classify_row(cells: List[str]) -> str: - """Return the sync status of a single row. - - Reads the stored hash and current cell values to determine whether - the row is synced, modified, new, or in an error state. - """ - # Ensure we have enough columns - while len(cells) < sf.BOM_TOTAL_COLS: - cells.append("") - - stored_hash = cells[sf.COL_ROW_HASH].strip() if cells[sf.COL_ROW_HASH] else "" - stored_status = cells[sf.COL_ROW_STATUS].strip() if cells[sf.COL_ROW_STATUS] else "" - - # No hash -> new row (never pulled from server) - if not stored_hash: - # Check if there's any data in the row - has_data = any( - cells[i].strip() - for i in range(sf.COL_PROP_END) - if i < len(cells) and cells[i] - ) - return STATUS_NEW if has_data else "" - - # Compute current hash and compare - current_hash = compute_row_hash(cells) - if current_hash == stored_hash: - return STATUS_SYNCED - return STATUS_MODIFIED - - -def classify_rows(all_rows: List[List[str]]) -> List[Tuple[int, str, List[str]]]: - """Classify every row in a sheet. - - Returns list of ``(row_index, status, cells)`` for rows that have data. - Blank separator rows and the header row (index 0) are skipped. - """ - results = [] - for i, cells in enumerate(all_rows): - if i == 0: - continue # header row - status = classify_row(list(cells)) - if status: - results.append((i, status, list(cells))) - return results - - -def build_push_diff( - classified: List[Tuple[int, str, List[str]]], - server_timestamps: Optional[Dict[str, str]] = None, -) -> Dict[str, List[Dict[str, Any]]]: - """Build a push diff from classified rows. - - *server_timestamps* maps part numbers to their server ``updated_at`` - values, used for conflict detection. - - Returns a dict with keys ``new``, ``modified``, ``conflicts``, and - the count of ``unchanged`` rows. - """ - server_ts = server_timestamps or {} - new_rows = [] - modified_rows = [] - conflicts = [] - unchanged = 0 - - for row_idx, status, cells in classified: - if status == STATUS_SYNCED: - unchanged += 1 - continue - - pn = cells[sf.COL_PN].strip() if len(cells) > sf.COL_PN else "" - stored_ts = ( - cells[sf.COL_UPDATED_AT].strip() - if len(cells) > sf.COL_UPDATED_AT and cells[sf.COL_UPDATED_AT] - else "" - ) - - row_info = { - "row_index": row_idx, - "part_number": pn, - "description": cells[sf.COL_DESCRIPTION].strip() - if len(cells) > sf.COL_DESCRIPTION - else "", - "cells": cells[: sf.COL_PROP_END], - } - - if status == STATUS_NEW: - new_rows.append(row_info) - elif status == STATUS_MODIFIED: - # Check for conflict: server changed since we pulled - server_updated = server_ts.get(pn, "") - if stored_ts and server_updated and server_updated != stored_ts: - row_info["local_ts"] = stored_ts - row_info["server_ts"] = server_updated - conflicts.append(row_info) - else: - modified_rows.append(row_info) - - return { - "new": new_rows, - "modified": modified_rows, - "conflicts": conflicts, - "unchanged": unchanged, - } - - -def update_row_sync_state( - cells: List[str], - status: str, - updated_at: str = "", - parent_pn: str = "", -) -> List[str]: - """Set the sync tracking columns on a row and return it. - - Recomputes the row hash from current visible+property data. - """ - while len(cells) < sf.BOM_TOTAL_COLS: - cells.append("") - - cells[sf.COL_ROW_HASH] = compute_row_hash(cells) - cells[sf.COL_ROW_STATUS] = status - if updated_at: - cells[sf.COL_UPDATED_AT] = updated_at - if parent_pn: - cells[sf.COL_PARENT_PN] = parent_pn - - return cells diff --git a/pkg/calc/silo_calc_component.py b/pkg/calc/silo_calc_component.py deleted file mode 100644 index affcd78..0000000 --- a/pkg/calc/silo_calc_component.py +++ /dev/null @@ -1,496 +0,0 @@ -"""UNO ProtocolHandler component for the Silo Calc extension. - -This file is registered in META-INF/manifest.xml and acts as the entry -point for all toolbar / menu commands. Each custom protocol URL -dispatches to a handler function that orchestrates the corresponding -feature. - -All silo_calc submodule imports are deferred to handler call time so -that the component registration always succeeds even if a submodule -has issues. -""" - -import os -import sys -import traceback - -import uno -import unohelper -from com.sun.star.frame import XDispatch, XDispatchProvider -from com.sun.star.lang import XInitialization, XServiceInfo - -# Ensure pythonpath/ is importable -_ext_dir = os.path.dirname(os.path.abspath(__file__)) -_pypath = os.path.join(_ext_dir, "pythonpath") -if _pypath not in sys.path: - sys.path.insert(0, _pypath) - -# Service identifiers -_IMPL_NAME = "io.kindredsystems.silo.calc.Component" -_SERVICE_NAME = "com.sun.star.frame.ProtocolHandler" -_PROTOCOL = "io.kindredsystems.silo.calc:" - - -def _log(msg: str): - """Print to the LibreOffice terminal / stderr.""" - print(f"[Silo Calc] {msg}") - - -def _get_desktop(): - ctx = uno.getComponentContext() - smgr = ctx.ServiceManager - return smgr.createInstanceWithContext("com.sun.star.frame.Desktop", ctx) - - -def _get_active_sheet(): - """Return (doc, sheet) for the current spreadsheet, or (None, None).""" - desktop = _get_desktop() - doc = desktop.getCurrentComponent() - if doc is None: - return None, None - if not doc.supportsService("com.sun.star.sheet.SpreadsheetDocument"): - return None, None - sheet = doc.getSheets().getByIndex( - doc.getCurrentController().getActiveSheet().getRangeAddress().Sheet - ) - return doc, sheet - - -def _msgbox(title, message, box_type="infobox"): - """Lightweight message box that doesn't depend on dialogs module.""" - ctx = uno.getComponentContext() - smgr = ctx.ServiceManager - toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx) - parent = _get_desktop().getCurrentFrame().getContainerWindow() - mbt = uno.Enum( - "com.sun.star.awt.MessageBoxType", - "INFOBOX" if box_type == "infobox" else "ERRORBOX", - ) - box = toolkit.createMessageBox(parent, mbt, 1, title, message) - box.execute() - - -# --------------------------------------------------------------------------- -# Command handlers -- imports are deferred to call time -# --------------------------------------------------------------------------- - - -def _cmd_login(frame): - from silo_calc import dialogs - - dialogs.show_login_dialog() - - -def _cmd_settings(frame): - from silo_calc import dialogs - - dialogs.show_settings_dialog() - - -def _cmd_pull_bom(frame): - """Pull a BOM from the server and populate the active sheet.""" - from silo_calc import dialogs, project_files - from silo_calc import pull as _pull - from silo_calc import settings as _settings - from silo_calc.client import SiloClient - - client = SiloClient() - if not client.is_authenticated(): - dialogs.show_login_dialog() - client = SiloClient() # reload after login - if not client.is_authenticated(): - return - - pn = dialogs.show_assembly_picker(client) - if not pn: - return - - project_code = ( - dialogs._input_box("Pull BOM", "Project code for auto-tagging (optional):") - or "" - ) - - doc, sheet = _get_active_sheet() - if doc is None: - desktop = _get_desktop() - doc = desktop.loadComponentFromURL("private:factory/scalc", "_blank", 0, ()) - sheet = doc.getSheets().getByIndex(0) - sheet.setName("BOM") - - try: - count = _pull.pull_bom( - client, - doc, - sheet, - pn, - project_code=project_code.strip(), - schema=_settings.get("default_schema", "kindred-rd"), - ) - _log(f"Pulled BOM for {pn}: {count} rows") - - if project_code.strip(): - path = project_files.get_project_sheet_path(project_code.strip()) - project_files.ensure_project_dir(project_code.strip()) - url = uno.systemPathToFileUrl(str(path)) - doc.storeToURL(url, ()) - _log(f"Saved to {path}") - - _msgbox("Pull BOM", f"Pulled {count} rows for {pn}.") - except RuntimeError as e: - _msgbox("Pull BOM Failed", str(e), box_type="errorbox") - - -def _cmd_pull_project(frame): - """Pull all project items as a multi-sheet workbook.""" - from silo_calc import dialogs, project_files - from silo_calc import pull as _pull - from silo_calc import settings as _settings - from silo_calc.client import SiloClient - - client = SiloClient() - if not client.is_authenticated(): - dialogs.show_login_dialog() - client = SiloClient() - if not client.is_authenticated(): - return - - code = dialogs.show_project_picker(client) - if not code: - return - - doc, _ = _get_active_sheet() - if doc is None: - desktop = _get_desktop() - doc = desktop.loadComponentFromURL("private:factory/scalc", "_blank", 0, ()) - - try: - count = _pull.pull_project( - client, - doc, - code.strip(), - schema=_settings.get("default_schema", "kindred-rd"), - ) - _log(f"Pulled project {code}: {count} items") - - path = project_files.get_project_sheet_path(code.strip()) - project_files.ensure_project_dir(code.strip()) - url = uno.systemPathToFileUrl(str(path)) - doc.storeToURL(url, ()) - _log(f"Saved to {path}") - - _msgbox("Pull Project", f"Pulled {count} items for project {code}.") - except RuntimeError as e: - _msgbox("Pull Project Failed", str(e), box_type="errorbox") - - -def _cmd_push(frame): - """Push local changes back to the server.""" - from silo_calc import dialogs, sync_engine - from silo_calc import push as _push - from silo_calc import settings as _settings - from silo_calc import sheet_format as sf - from silo_calc.client import SiloClient - - client = SiloClient() - if not client.is_authenticated(): - dialogs.show_login_dialog() - client = SiloClient() - if not client.is_authenticated(): - return - - doc, sheet = _get_active_sheet() - if doc is None or sheet is None: - _msgbox("Push", "No active spreadsheet.", box_type="errorbox") - return - - rows = _push._read_sheet_rows(sheet) - classified = sync_engine.classify_rows(rows) - - modified_pns = [ - cells[sf.COL_PN].strip() - for _, status, cells in classified - if status == sync_engine.STATUS_MODIFIED and cells[sf.COL_PN].strip() - ] - server_ts = _push._fetch_server_timestamps(client, modified_pns) - diff = sync_engine.build_push_diff(classified, server_timestamps=server_ts) - - ok = dialogs.show_push_summary( - new_count=len(diff["new"]), - modified_count=len(diff["modified"]), - conflict_count=len(diff["conflicts"]), - unchanged_count=diff["unchanged"], - ) - if not ok: - return - - try: - results = _push.push_sheet( - client, - doc, - sheet, - schema=_settings.get("default_schema", "kindred-rd"), - ) - except RuntimeError as e: - _msgbox("Push Failed", str(e), box_type="errorbox") - return - - try: - file_url = doc.getURL() - if file_url: - doc.store() - except Exception: - pass - - summary_lines = [ - f"Created: {results['created']}", - f"Updated: {results['updated']}", - f"Conflicts: {results.get('conflicts', 0)}", - f"Skipped: {results['skipped']}", - ] - if results["errors"]: - summary_lines.append(f"\nErrors ({len(results['errors'])}):") - for err in results["errors"][:10]: - summary_lines.append(f" - {err}") - if len(results["errors"]) > 10: - summary_lines.append(f" ... and {len(results['errors']) - 10} more") - - _msgbox("Push Complete", "\n".join(summary_lines)) - _log(f"Push complete: {results['created']} created, {results['updated']} updated") - - -def _cmd_add_item(frame): - """Completion wizard for adding a new BOM row.""" - from silo_calc import completion_wizard as _wizard - from silo_calc import settings as _settings - from silo_calc.client import SiloClient - - client = SiloClient() - if not client.is_authenticated(): - from silo_calc import dialogs - - dialogs.show_login_dialog() - client = SiloClient() - if not client.is_authenticated(): - return - - doc, sheet = _get_active_sheet() - if doc is None or sheet is None: - _msgbox("Add Item", "No active spreadsheet.", box_type="errorbox") - return - - project_code = "" - try: - file_url = doc.getURL() - if file_url: - file_path = uno.fileUrlToSystemPath(file_url) - parts = file_path.replace("\\", "/").split("/") - if "sheets" in parts: - idx = parts.index("sheets") - if idx + 1 < len(parts): - project_code = parts[idx + 1] - except Exception: - pass - - cursor = sheet.createCursor() - cursor.gotoStartOfUsedArea(False) - cursor.gotoEndOfUsedArea(True) - insert_row = cursor.getRangeAddress().EndRow + 1 - - ok = _wizard.run_completion_wizard( - client, - doc, - sheet, - insert_row, - project_code=project_code, - schema=_settings.get("default_schema", "kindred-rd"), - ) - if ok: - _log(f"Added new item at row {insert_row + 1}") - - -def _cmd_refresh(frame): - """Re-pull the current sheet from server.""" - _msgbox("Refresh", "Refresh -- coming soon.") - - -def _cmd_ai_description(frame): - """Generate an AI description from the seller description in the current row.""" - from silo_calc import ai_client as _ai - from silo_calc import dialogs - from silo_calc import pull as _pull - from silo_calc import sheet_format as sf - - if not _ai.is_configured(): - _msgbox( - "AI Describe", - "OpenRouter API key not configured.\n\n" - "Set it in Silo Settings or via the OPENROUTER_API_KEY environment variable.", - box_type="errorbox", - ) - return - - doc, sheet = _get_active_sheet() - if doc is None or sheet is None: - _msgbox("AI Describe", "No active spreadsheet.", box_type="errorbox") - return - - controller = doc.getCurrentController() - selection = controller.getSelection() - try: - cell_addr = selection.getCellAddress() - row = cell_addr.Row - except AttributeError: - try: - range_addr = selection.getRangeAddress() - row = range_addr.StartRow - except AttributeError: - _msgbox("AI Describe", "Select a cell in a BOM row.", box_type="errorbox") - return - - if row == 0: - _msgbox( - "AI Describe", "Select a data row, not the header.", box_type="errorbox" - ) - return - - seller_desc = sheet.getCellByPosition(sf.COL_SELLER_DESC, row).getString().strip() - if not seller_desc: - _msgbox( - "AI Describe", - f"No seller description in column F (row {row + 1}).", - box_type="errorbox", - ) - return - - existing_desc = sheet.getCellByPosition(sf.COL_DESCRIPTION, row).getString().strip() - part_number = sheet.getCellByPosition(sf.COL_PN, row).getString().strip() - category = part_number[:3] if len(part_number) >= 3 else "" - - while True: - try: - ai_desc = _ai.generate_description( - seller_description=seller_desc, - category=category, - existing_description=existing_desc, - part_number=part_number, - ) - except RuntimeError as e: - _msgbox("AI Describe Failed", str(e), box_type="errorbox") - return - - accepted = dialogs.show_ai_description_dialog(seller_desc, ai_desc) - if accepted is not None: - _pull._set_cell_string(sheet, sf.COL_DESCRIPTION, row, accepted) - _log(f"AI description written to row {row + 1}: {accepted}") - return - - retry = dialogs._input_box( - "AI Describe", - "Generate again? (yes/no):", - default="no", - ) - if not retry or retry.strip().lower() not in ("yes", "y"): - return - - -# Command dispatch table -_COMMANDS = { - "SiloLogin": _cmd_login, - "SiloPullBOM": _cmd_pull_bom, - "SiloPullProject": _cmd_pull_project, - "SiloPush": _cmd_push, - "SiloAddItem": _cmd_add_item, - "SiloRefresh": _cmd_refresh, - "SiloSettings": _cmd_settings, - "SiloAIDescription": _cmd_ai_description, -} - - -# --------------------------------------------------------------------------- -# UNO Dispatch implementation -# --------------------------------------------------------------------------- - - -class SiloDispatch(unohelper.Base, XDispatch): - """Handles a single dispatched command.""" - - def __init__(self, command: str, frame): - self._command = command - self._frame = frame - self._listeners = [] - - def dispatch(self, url, args): - handler = _COMMANDS.get(self._command) - if handler: - try: - handler(self._frame) - except Exception: - _log(f"Error in {self._command}:\n{traceback.format_exc()}") - try: - _msgbox( - f"Silo Error: {self._command}", - traceback.format_exc(), - box_type="errorbox", - ) - except Exception: - pass - - def addStatusListener(self, listener, url): - self._listeners.append(listener) - - def removeStatusListener(self, listener, url): - if listener in self._listeners: - self._listeners.remove(listener) - - -class SiloDispatchProvider( - unohelper.Base, XDispatchProvider, XInitialization, XServiceInfo -): - """ProtocolHandler component for Silo commands. - - LibreOffice instantiates this via com.sun.star.frame.ProtocolHandler - and calls initialize() with the frame, then queryDispatch() for each - command URL matching our protocol. - """ - - def __init__(self, ctx): - self._ctx = ctx - self._frame = None - - # XInitialization -- called by framework with the Frame - def initialize(self, args): - if args: - self._frame = args[0] - - # XDispatchProvider - def queryDispatch(self, url, target_frame_name, search_flags): - if url.Protocol == _PROTOCOL: - command = url.Path - if command in _COMMANDS: - return SiloDispatch(command, self._frame) - return None - - def queryDispatches(self, requests): - return [ - self.queryDispatch(r.FeatureURL, r.FrameName, r.SearchFlags) - for r in requests - ] - - # XServiceInfo - def getImplementationName(self): - return _IMPL_NAME - - def supportsService(self, name): - return name == _SERVICE_NAME - - def getSupportedServiceNames(self): - return (_SERVICE_NAME,) - - -# UNO component registration -g_ImplementationHelper = unohelper.ImplementationHelper() -g_ImplementationHelper.addImplementation( - SiloDispatchProvider, - _IMPL_NAME, - (_SERVICE_NAME,), -) diff --git a/pkg/calc/tests/__init__.py b/pkg/calc/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pkg/calc/tests/test_basics.py b/pkg/calc/tests/test_basics.py deleted file mode 100644 index b6aeac1..0000000 --- a/pkg/calc/tests/test_basics.py +++ /dev/null @@ -1,345 +0,0 @@ -"""Basic tests for silo_calc modules (no UNO dependency).""" - -import hashlib -import json -import os -import sys -import tempfile -import unittest -from pathlib import Path - -# Add pythonpath to sys.path so we can import without LibreOffice -_pkg_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -_pypath = os.path.join(_pkg_dir, "pythonpath") -if _pypath not in sys.path: - sys.path.insert(0, _pypath) - -from silo_calc import project_files, sync_engine -from silo_calc import settings as _settings -from silo_calc import sheet_format as sf - - -class TestSheetFormat(unittest.TestCase): - def test_bom_header_counts(self): - self.assertEqual(len(sf.BOM_VISIBLE_HEADERS), 11) - self.assertEqual(len(sf.BOM_PROPERTY_HEADERS), 13) - self.assertEqual(len(sf.BOM_SYNC_HEADERS), 4) - self.assertEqual(sf.BOM_TOTAL_COLS, 28) - - def test_column_indices(self): - self.assertEqual(sf.COL_ITEM, 0) - self.assertEqual(sf.COL_PN, 3) - self.assertEqual(sf.COL_UNIT_COST, 6) - self.assertEqual(sf.COL_QTY, 7) - self.assertEqual(sf.COL_EXT_COST, 8) - - def test_detect_sheet_type_bom(self): - headers = [ - "Item", - "Level", - "Source", - "PN", - "Description", - "Seller Description", - "Unit Cost", - "QTY", - "Ext Cost", - ] - self.assertEqual(sf.detect_sheet_type(headers), "bom") - - def test_detect_sheet_type_items(self): - headers = ["PN", "Description", "Type", "Source"] - self.assertEqual(sf.detect_sheet_type(headers), "items") - - def test_detect_sheet_type_unknown(self): - self.assertIsNone(sf.detect_sheet_type([])) - self.assertIsNone(sf.detect_sheet_type(["Foo", "Bar"])) - - def test_col_letter(self): - self.assertEqual(sf.col_letter(0), "A") - self.assertEqual(sf.col_letter(25), "Z") - self.assertEqual(sf.col_letter(26), "AA") - self.assertEqual(sf.col_letter(27), "AB") - - def test_property_key_map_bidirectional(self): - for header, key in sf.PROPERTY_KEY_MAP.items(): - self.assertEqual(sf.DB_FIELD_TO_HEADER[key], header) - - -class TestSyncEngine(unittest.TestCase): - def _make_row(self, pn="F01-0001", desc="Test", cost="10.00", qty="2"): - """Create a minimal BOM row with enough columns.""" - cells = [""] * sf.BOM_TOTAL_COLS - cells[sf.COL_PN] = pn - cells[sf.COL_DESCRIPTION] = desc - cells[sf.COL_UNIT_COST] = cost - cells[sf.COL_QTY] = qty - return cells - - def test_compute_row_hash_deterministic(self): - row = self._make_row() - h1 = sync_engine.compute_row_hash(row) - h2 = sync_engine.compute_row_hash(row) - self.assertEqual(h1, h2) - self.assertEqual(len(h1), 64) # SHA-256 hex - - def test_compute_row_hash_changes(self): - row1 = self._make_row(cost="10.00") - row2 = self._make_row(cost="20.00") - self.assertNotEqual( - sync_engine.compute_row_hash(row1), - sync_engine.compute_row_hash(row2), - ) - - def test_classify_row_new(self): - row = self._make_row() - # No stored hash -> new - self.assertEqual(sync_engine.classify_row(row), sync_engine.STATUS_NEW) - - def test_classify_row_synced(self): - row = self._make_row() - # Set stored hash to current hash - row[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row) - row[sf.COL_ROW_STATUS] = "synced" - self.assertEqual(sync_engine.classify_row(row), sync_engine.STATUS_SYNCED) - - def test_classify_row_modified(self): - row = self._make_row() - row[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row) - # Now change a cell - row[sf.COL_UNIT_COST] = "99.99" - self.assertEqual(sync_engine.classify_row(row), sync_engine.STATUS_MODIFIED) - - def test_classify_rows_skips_header(self): - header = list(sf.BOM_ALL_HEADERS) - row1 = self._make_row() - all_rows = [header, row1] - classified = sync_engine.classify_rows(all_rows) - # Header row (index 0) should be skipped - self.assertEqual(len(classified), 1) - self.assertEqual(classified[0][0], 1) # row index - - def test_update_row_sync_state(self): - row = self._make_row() - updated = sync_engine.update_row_sync_state( - row, "synced", updated_at="2025-01-01T00:00:00Z", parent_pn="A01-0003" - ) - self.assertEqual(updated[sf.COL_ROW_STATUS], "synced") - self.assertEqual(updated[sf.COL_UPDATED_AT], "2025-01-01T00:00:00Z") - self.assertEqual(updated[sf.COL_PARENT_PN], "A01-0003") - # Hash should be set - self.assertEqual(len(updated[sf.COL_ROW_HASH]), 64) - - def test_build_push_diff(self): - row_new = self._make_row(pn="NEW-0001") - row_synced = self._make_row(pn="F01-0001") - row_synced[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row_synced) - - row_modified = self._make_row(pn="F01-0002") - row_modified[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row_modified) - row_modified[sf.COL_UNIT_COST] = "999.99" # change after hash - - classified = [ - (1, sync_engine.STATUS_NEW, row_new), - (2, sync_engine.STATUS_SYNCED, row_synced), - (3, sync_engine.STATUS_MODIFIED, row_modified), - ] - diff = sync_engine.build_push_diff(classified) - self.assertEqual(len(diff["new"]), 1) - self.assertEqual(len(diff["modified"]), 1) - self.assertEqual(diff["unchanged"], 1) - self.assertEqual(len(diff["conflicts"]), 0) - - def test_conflict_detection(self): - row = self._make_row(pn="F01-0001") - row[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row) - row[sf.COL_UPDATED_AT] = "2025-01-01T00:00:00Z" - row[sf.COL_UNIT_COST] = "changed" # local modification - - classified = [(1, sync_engine.STATUS_MODIFIED, row)] - diff = sync_engine.build_push_diff( - classified, - server_timestamps={ - "F01-0001": "2025-06-01T00:00:00Z" - }, # server changed too - ) - self.assertEqual(len(diff["conflicts"]), 1) - self.assertEqual(len(diff["modified"]), 0) - - -class TestSettings(unittest.TestCase): - def test_load_defaults(self): - # Use a temp dir so we don't touch real settings - with tempfile.TemporaryDirectory() as tmp: - _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" - cfg = _settings.load() - self.assertEqual(cfg["ssl_verify"], True) - self.assertEqual(cfg["default_schema"], "kindred-rd") - - def test_save_and_load(self): - with tempfile.TemporaryDirectory() as tmp: - _settings._SETTINGS_DIR = Path(tmp) - _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" - _settings.put("api_url", "https://silo.test/api") - cfg = _settings.load() - self.assertEqual(cfg["api_url"], "https://silo.test/api") - - def test_save_auth_and_clear(self): - with tempfile.TemporaryDirectory() as tmp: - _settings._SETTINGS_DIR = Path(tmp) - _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" - _settings.save_auth("testuser", "editor", "local", "silo_abc123") - cfg = _settings.load() - self.assertEqual(cfg["auth_username"], "testuser") - self.assertEqual(cfg["api_token"], "silo_abc123") - - _settings.clear_auth() - cfg = _settings.load() - self.assertEqual(cfg["api_token"], "") - self.assertEqual(cfg["auth_username"], "") - - -class TestProjectFiles(unittest.TestCase): - def test_get_project_sheet_path(self): - path = project_files.get_project_sheet_path("3DX10") - self.assertTrue(str(path).endswith("sheets/3DX10/3DX10.ods")) - - def test_save_and_read(self): - with tempfile.TemporaryDirectory() as tmp: - _settings._SETTINGS_DIR = Path(tmp) - _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" - _settings.put("projects_dir", tmp) - - test_data = b"PK\x03\x04fake-ods-content" - path = project_files.save_project_sheet("TEST", test_data) - self.assertTrue(path.is_file()) - self.assertEqual(path.name, "TEST.ods") - - read_back = project_files.read_project_sheet("TEST") - self.assertEqual(read_back, test_data) - - def test_list_project_sheets(self): - with tempfile.TemporaryDirectory() as tmp: - _settings._SETTINGS_DIR = Path(tmp) - _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" - _settings.put("projects_dir", tmp) - - # Create two project dirs - for code in ("AAA", "BBB"): - d = Path(tmp) / "sheets" / code - d.mkdir(parents=True) - (d / f"{code}.ods").write_bytes(b"fake") - - sheets = project_files.list_project_sheets() - codes = [s[0] for s in sheets] - self.assertIn("AAA", codes) - self.assertIn("BBB", codes) - - -class TestAIClient(unittest.TestCase): - """Test ai_client helpers that don't require network or UNO.""" - - def test_default_constants(self): - from silo_calc import ai_client - - self.assertTrue(ai_client.OPENROUTER_API_URL.startswith("https://")) - self.assertTrue(len(ai_client.DEFAULT_MODEL) > 0) - self.assertTrue(len(ai_client.DEFAULT_INSTRUCTIONS) > 0) - - def test_is_configured_false_by_default(self): - from silo_calc import ai_client - - with tempfile.TemporaryDirectory() as tmp: - _settings._SETTINGS_DIR = Path(tmp) - _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" - old = os.environ.pop("OPENROUTER_API_KEY", None) - try: - self.assertFalse(ai_client.is_configured()) - finally: - if old is not None: - os.environ["OPENROUTER_API_KEY"] = old - - def test_is_configured_with_env_var(self): - from silo_calc import ai_client - - with tempfile.TemporaryDirectory() as tmp: - _settings._SETTINGS_DIR = Path(tmp) - _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" - old = os.environ.get("OPENROUTER_API_KEY") - os.environ["OPENROUTER_API_KEY"] = "sk-test-key" - try: - self.assertTrue(ai_client.is_configured()) - finally: - if old is not None: - os.environ["OPENROUTER_API_KEY"] = old - else: - os.environ.pop("OPENROUTER_API_KEY", None) - - def test_is_configured_with_settings(self): - from silo_calc import ai_client - - with tempfile.TemporaryDirectory() as tmp: - _settings._SETTINGS_DIR = Path(tmp) - _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" - _settings.put("openrouter_api_key", "sk-test-key") - old = os.environ.pop("OPENROUTER_API_KEY", None) - try: - self.assertTrue(ai_client.is_configured()) - finally: - if old is not None: - os.environ["OPENROUTER_API_KEY"] = old - - def test_chat_completion_missing_key_raises(self): - from silo_calc import ai_client - - with tempfile.TemporaryDirectory() as tmp: - _settings._SETTINGS_DIR = Path(tmp) - _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" - old = os.environ.pop("OPENROUTER_API_KEY", None) - try: - with self.assertRaises(RuntimeError) as ctx: - ai_client.chat_completion([{"role": "user", "content": "test"}]) - self.assertIn("not configured", str(ctx.exception)) - finally: - if old is not None: - os.environ["OPENROUTER_API_KEY"] = old - - def test_get_model_default(self): - from silo_calc import ai_client - - with tempfile.TemporaryDirectory() as tmp: - _settings._SETTINGS_DIR = Path(tmp) - _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" - self.assertEqual(ai_client._get_model(), ai_client.DEFAULT_MODEL) - - def test_get_model_from_settings(self): - from silo_calc import ai_client - - with tempfile.TemporaryDirectory() as tmp: - _settings._SETTINGS_DIR = Path(tmp) - _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" - _settings.put("openrouter_model", "anthropic/claude-3-haiku") - self.assertEqual(ai_client._get_model(), "anthropic/claude-3-haiku") - - def test_get_instructions_default(self): - from silo_calc import ai_client - - with tempfile.TemporaryDirectory() as tmp: - _settings._SETTINGS_DIR = Path(tmp) - _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" - self.assertEqual( - ai_client._get_instructions(), ai_client.DEFAULT_INSTRUCTIONS - ) - - def test_get_instructions_from_settings(self): - from silo_calc import ai_client - - with tempfile.TemporaryDirectory() as tmp: - _settings._SETTINGS_DIR = Path(tmp) - _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" - _settings.put("openrouter_instructions", "Custom instructions") - self.assertEqual(ai_client._get_instructions(), "Custom instructions") - - -if __name__ == "__main__": - unittest.main()