commit 13b56fd1b01ba3fa6eadaaf56591e96489f17619 Author: Zoe Forbes Date: Fri Feb 6 11:14:54 2026 -0600 initial: LibreOffice Calc Silo extension (extracted from silo monorepo) LibreOffice Calc extension for Silo PLM integration. Uses shared silo-client package (submodule) for API communication. Changes from monorepo version: - SiloClient class removed from client.py, replaced with CalcSiloSettings adapter + factory function wrapping silo_client.SiloClient - silo_calc_component.py adds silo-client to sys.path - Makefile build-oxt copies silo_client into .oxt for self-contained packaging - All other modules unchanged diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..43023f5 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "silo-client"] + path = silo-client + url = https://git.kindred-systems.com/kindred/silo-client.git diff --git a/Addons.xcu b/Addons.xcu new file mode 100644 index 0000000..4d07ca3 --- /dev/null +++ b/Addons.xcu @@ -0,0 +1,235 @@ + + + + + + + + + + + io.kindredsystems.silo.calc:SiloLogin + + + Login + + + _self + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloPullBOM + + + Pull BOM + + + _self + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloPullProject + + + Pull Project + + + _self + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloPush + + + Push + + + _self + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloAddItem + + + Add Item + + + _self + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloRefresh + + + Refresh + + + _self + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloSettings + + + Settings + + + _self + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloAIDescription + + + AI Describe + + + _self + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + + + + + + io.kindredsystems.silo.calc:SiloLogin + + + Silo: Login + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloPullBOM + + + Silo: Pull BOM + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloPullProject + + + Silo: Pull Project Items + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloPush + + + Silo: Push Changes + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloAddItem + + + Silo: Add Item + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloRefresh + + + Silo: Refresh + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloSettings + + + Silo: Settings + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + io.kindredsystems.silo.calc:SiloAIDescription + + + Silo: AI Describe + + + com.sun.star.sheet.SpreadsheetDocument + + + + + + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e2c094f --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 Kindred Systems LLC + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/META-INF/manifest.xml b/META-INF/manifest.xml new file mode 100644 index 0000000..b5a62eb --- /dev/null +++ b/META-INF/manifest.xml @@ -0,0 +1,7 @@ + + + + + + + diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d13aa59 --- /dev/null +++ b/Makefile @@ -0,0 +1,52 @@ +.PHONY: build-oxt install uninstall install-dev test clean help + +# Build .oxt extension package (self-contained with silo_client) +build-oxt: + @echo "Building silo-calc.oxt..." + @rm -rf _oxt_build && mkdir _oxt_build + @cp -r pythonpath description META-INF *.py *.xml *.xcu _oxt_build/ + @cp -r silo-client/silo_client _oxt_build/pythonpath/silo_client + @cd _oxt_build && zip -r ../silo-calc.oxt . -x '*.pyc' '*__pycache__/*' + @rm -rf _oxt_build + @echo "Built silo-calc.oxt" + +# Install extension system-wide (requires unopkg) +install: build-oxt + unopkg add --shared silo-calc.oxt 2>/dev/null || unopkg add silo-calc.oxt + @echo "Installed silo-calc extension. Restart LibreOffice to load." + +# Uninstall extension +uninstall: + unopkg remove io.kindredsystems.silo.calc 2>/dev/null || true + @echo "Uninstalled silo-calc extension." + +# Development install: symlink into user extensions dir +install-dev: + @CALC_EXT_DIR="$${HOME}/.config/libreoffice/4/user/extensions"; \ + if [ -d "$$CALC_EXT_DIR" ]; then \ + rm -rf "$$CALC_EXT_DIR/silo-calc"; \ + ln -sf $(PWD) "$$CALC_EXT_DIR/silo-calc"; \ + echo "Symlinked to $$CALC_EXT_DIR/silo-calc"; \ + else \ + echo "LibreOffice extensions dir not found at $$CALC_EXT_DIR"; \ + echo "Try: make install (uses unopkg)"; \ + fi + @echo "Restart LibreOffice to load the Silo Calc extension" + +# Run Python tests +test: + python3 -m unittest tests/test_basics.py -v + +# Clean build artifacts +clean: + rm -f silo-calc.oxt + rm -rf _oxt_build + +help: + @echo "silo-calc targets:" + @echo " build-oxt - Build .oxt extension package" + @echo " install - Install extension (uses unopkg)" + @echo " install-dev - Symlink for development" + @echo " uninstall - Remove extension" + @echo " test - Run Python tests" + @echo " clean - Remove build artifacts" diff --git a/ProtocolHandler.xcu b/ProtocolHandler.xcu new file mode 100644 index 0000000..e50f5eb --- /dev/null +++ b/ProtocolHandler.xcu @@ -0,0 +1,14 @@ + + + + + + io.kindredsystems.silo.calc:* + + + + diff --git a/README.md b/README.md new file mode 100644 index 0000000..2fb2e1b --- /dev/null +++ b/README.md @@ -0,0 +1,30 @@ +# silo-calc + +LibreOffice Calc extension for the Silo PLM system. Provides BOM pull/push, +project sheet management, and AI-assisted descriptions. + +## Dependencies + +- [silo-client](https://git.kindred-systems.com/kindred/silo-client.git) -- included as a git submodule + +## Build + +```bash +git submodule update --init +make build-oxt +``` + +This produces `silo-calc.oxt` with the shared `silo_client` package bundled inside. + +## Install + +```bash +make install # system-wide via unopkg +make install-dev # symlink for development +``` + +## Test + +```bash +make test +``` diff --git a/description.xml b/description.xml new file mode 100644 index 0000000..6af2111 --- /dev/null +++ b/description.xml @@ -0,0 +1,27 @@ + + + + + + + + Silo - Spreadsheet Sync + + + + Kindred Systems + + + + + + + + + + + + + diff --git a/description/description_en.txt b/description/description_en.txt new file mode 100644 index 0000000..5aeb431 --- /dev/null +++ b/description/description_en.txt @@ -0,0 +1,15 @@ +Silo Spreadsheet Sync for LibreOffice Calc + +Bidirectional sync between LibreOffice Calc spreadsheets and the Silo +parts database. Pull project BOMs, edit in Calc, push changes back. + +Features: +- Pull BOM: fetch an expanded bill of materials as a formatted sheet +- Pull Project: fetch all items tagged with a project code +- Push: sync local edits (new items, modified fields) back to the database +- Add Item wizard: guided workflow for adding new BOM entries +- PN conflict resolution: handle duplicate part numbers gracefully +- Auto project tagging: items in a working BOM are tagged with the project + +Toolbar commands appear when a Calc spreadsheet is active. +Settings and API token are stored in ~/.config/silo/calc-settings.json. diff --git a/pythonpath/silo_calc/__init__.py b/pythonpath/silo_calc/__init__.py new file mode 100644 index 0000000..92e851c --- /dev/null +++ b/pythonpath/silo_calc/__init__.py @@ -0,0 +1,3 @@ +"""Silo LibreOffice Calc extension -- spreadsheet sync for project data.""" + +__version__ = "0.1.0" diff --git a/pythonpath/silo_calc/__pycache__/__init__.cpython-313.pyc b/pythonpath/silo_calc/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..0de3fd2 Binary files /dev/null and b/pythonpath/silo_calc/__pycache__/__init__.cpython-313.pyc differ diff --git a/pythonpath/silo_calc/__pycache__/client.cpython-313.pyc b/pythonpath/silo_calc/__pycache__/client.cpython-313.pyc new file mode 100644 index 0000000..17741c6 Binary files /dev/null and b/pythonpath/silo_calc/__pycache__/client.cpython-313.pyc differ diff --git a/pythonpath/silo_calc/__pycache__/settings.cpython-313.pyc b/pythonpath/silo_calc/__pycache__/settings.cpython-313.pyc new file mode 100644 index 0000000..77828e8 Binary files /dev/null and b/pythonpath/silo_calc/__pycache__/settings.cpython-313.pyc differ diff --git a/pythonpath/silo_calc/ai_client.py b/pythonpath/silo_calc/ai_client.py new file mode 100644 index 0000000..5d81aa9 --- /dev/null +++ b/pythonpath/silo_calc/ai_client.py @@ -0,0 +1,217 @@ +"""OpenRouter AI client for the Silo Calc extension. + +Provides AI-powered text generation via the OpenRouter API +(https://openrouter.ai/api/v1/chat/completions). Uses stdlib urllib +only -- no external dependencies. + +The core ``chat_completion()`` function is generic and reusable for +future features (price analysis, sourcing assistance). Domain helpers +like ``generate_description()`` build on top of it. +""" + +import json +import os +import ssl +import urllib.error +import urllib.request +from typing import Any, Dict, List, Optional + +from . import settings as _settings + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +OPENROUTER_API_URL = "https://openrouter.ai/api/v1/chat/completions" + +DEFAULT_MODEL = "openai/gpt-4.1-nano" + +DEFAULT_INSTRUCTIONS = ( + "You are a parts librarian for an engineering company. " + "Given a seller's product description, produce a concise, standardized " + "part description suitable for a Bill of Materials. Rules:\n" + "- Maximum 60 characters\n" + "- Use title case\n" + "- Start with the component type (e.g., Bolt, Resistor, Bearing)\n" + "- Include key specifications (size, rating, material) in order of importance\n" + "- Omit brand names, marketing language, and redundant words\n" + "- Use standard engineering abbreviations (SS, Al, M3, 1/4-20)\n" + "- Output ONLY the description, no quotes or explanation" +) + +# --------------------------------------------------------------------------- +# SSL helper (same pattern as client.py) +# --------------------------------------------------------------------------- + + +def _get_ssl_context() -> ssl.SSLContext: + """Build an SSL context for OpenRouter API calls.""" + ctx = ssl.create_default_context() + for ca_path in ( + "/etc/ssl/certs/ca-certificates.crt", + "/etc/pki/tls/certs/ca-bundle.crt", + ): + if os.path.isfile(ca_path): + try: + ctx.load_verify_locations(ca_path) + except Exception: + pass + break + return ctx + + +# --------------------------------------------------------------------------- +# Settings resolution helpers +# --------------------------------------------------------------------------- + + +def _get_api_key() -> str: + """Resolve the OpenRouter API key from settings or environment.""" + cfg = _settings.load() + key = cfg.get("openrouter_api_key", "") + if not key: + key = os.environ.get("OPENROUTER_API_KEY", "") + return key + + +def _get_model() -> str: + """Resolve the model slug from settings or default.""" + cfg = _settings.load() + return cfg.get("openrouter_model", "") or DEFAULT_MODEL + + +def _get_instructions() -> str: + """Resolve the system instructions from settings or default.""" + cfg = _settings.load() + return cfg.get("openrouter_instructions", "") or DEFAULT_INSTRUCTIONS + + +# --------------------------------------------------------------------------- +# Core API function +# --------------------------------------------------------------------------- + + +def chat_completion( + messages: List[Dict[str, str]], + model: Optional[str] = None, + temperature: float = 0.3, + max_tokens: int = 200, +) -> str: + """Send a chat completion request to OpenRouter. + + Parameters + ---------- + messages : list of {"role": str, "content": str} + model : model slug (default: from settings or DEFAULT_MODEL) + temperature : sampling temperature + max_tokens : maximum response tokens + + Returns + ------- + str : the assistant's response text + + Raises + ------ + RuntimeError : on missing API key, HTTP errors, network errors, + or unexpected response format. + """ + api_key = _get_api_key() + if not api_key: + raise RuntimeError( + "OpenRouter API key not configured. " + "Set it in Settings or the OPENROUTER_API_KEY environment variable." + ) + + model = model or _get_model() + + payload = { + "model": model, + "messages": messages, + "temperature": temperature, + "max_tokens": max_tokens, + } + + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + "HTTP-Referer": "https://github.com/kindredsystems/silo", + "X-Title": "Silo Calc Extension", + } + + body = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + OPENROUTER_API_URL, data=body, headers=headers, method="POST" + ) + + try: + with urllib.request.urlopen( + req, context=_get_ssl_context(), timeout=30 + ) as resp: + result = json.loads(resp.read().decode("utf-8")) + except urllib.error.HTTPError as e: + error_body = e.read().decode("utf-8", errors="replace") + if e.code == 401: + raise RuntimeError("OpenRouter API key is invalid or expired.") + if e.code == 402: + raise RuntimeError("OpenRouter account has insufficient credits.") + if e.code == 429: + raise RuntimeError("OpenRouter rate limit exceeded. Try again shortly.") + raise RuntimeError(f"OpenRouter API error {e.code}: {error_body}") + except urllib.error.URLError as e: + raise RuntimeError(f"Network error contacting OpenRouter: {e.reason}") + + choices = result.get("choices", []) + if not choices: + raise RuntimeError("OpenRouter returned an empty response.") + + return choices[0].get("message", {}).get("content", "").strip() + + +# --------------------------------------------------------------------------- +# Domain helpers +# --------------------------------------------------------------------------- + + +def generate_description( + seller_description: str, + category: str = "", + existing_description: str = "", + part_number: str = "", +) -> str: + """Generate a standardized part description from a seller description. + + Parameters + ---------- + seller_description : the raw seller/vendor description text + category : category code (e.g. "F01") for context + existing_description : current description in col E, if any + part_number : the part number, for context + + Returns + ------- + str : the AI-generated standardized description + """ + system_prompt = _get_instructions() + + user_parts = [] + if category: + user_parts.append(f"Category: {category}") + if part_number: + user_parts.append(f"Part Number: {part_number}") + if existing_description: + user_parts.append(f"Current Description: {existing_description}") + user_parts.append(f"Seller Description: {seller_description}") + + user_prompt = "\n".join(user_parts) + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] + + return chat_completion(messages) + + +def is_configured() -> bool: + """Return True if the OpenRouter API key is available.""" + return bool(_get_api_key()) diff --git a/pythonpath/silo_calc/client.py b/pythonpath/silo_calc/client.py new file mode 100644 index 0000000..62269de --- /dev/null +++ b/pythonpath/silo_calc/client.py @@ -0,0 +1,54 @@ +"""Silo API client for LibreOffice Calc extension. + +Thin wrapper around the shared ``silo_client`` package. Provides a +``CalcSiloSettings`` adapter that reads/writes the local JSON settings +file, and re-exports ``SiloClient`` so existing ``from .client import +SiloClient`` imports continue to work unchanged. +""" + +import os +import urllib.parse + +from silo_client import SiloClient as _BaseSiloClient +from silo_client import SiloSettings + +from . import settings as _settings + + +class CalcSiloSettings(SiloSettings): + """Settings adapter backed by ``~/.config/silo/calc-settings.json``.""" + + def get_api_url(self) -> str: + cfg = _settings.load() + url = cfg.get("api_url", "").rstrip("/") + if not url: + url = os.environ.get("SILO_API_URL", "http://localhost:8080/api") + parsed = urllib.parse.urlparse(url) + if not parsed.path or parsed.path == "/": + url = url + "/api" + return url + + def get_api_token(self) -> str: + return _settings.load().get("api_token", "") or os.environ.get( + "SILO_API_TOKEN", "" + ) + + def get_ssl_verify(self) -> bool: + return _settings.load().get("ssl_verify", True) + + def get_ssl_cert_path(self) -> str: + return _settings.load().get("ssl_cert_path", "") + + def save_auth(self, username, role, source, token): + _settings.save_auth(username=username, role=role, source=source, token=token) + + def clear_auth(self): + _settings.clear_auth() + + +_calc_settings = CalcSiloSettings() + + +def SiloClient(base_url=None): + """Factory matching the old ``SiloClient(base_url=...)`` constructor.""" + return _BaseSiloClient(_calc_settings, base_url=base_url) diff --git a/pythonpath/silo_calc/completion_wizard.py b/pythonpath/silo_calc/completion_wizard.py new file mode 100644 index 0000000..42b3119 --- /dev/null +++ b/pythonpath/silo_calc/completion_wizard.py @@ -0,0 +1,395 @@ +"""Completion Wizard for adding new items to a BOM sheet. + +Three-step guided workflow: +1. Category selection (from schema) +2. Required fields (Description, optional PN) +3. Common fields (Source, Unit Cost, QTY, Sourcing Link, category-specific properties) + +If a manually entered PN already exists, the PN Conflict Resolution dialog +is shown. +""" + +from typing import Any, Dict, List, Optional, Tuple + +from . import ai_client as _ai +from . import dialogs, sync_engine +from . import settings as _settings +from . import sheet_format as sf +from .client import SiloClient + +# UNO imports +try: + import uno + + _HAS_UNO = True + + _HAS_UNO = True +except ImportError: + _HAS_UNO = False + +# Category prefix descriptions for grouping in the picker +_PREFIX_GROUPS = { + "F": "Fasteners", + "C": "Fittings", + "R": "Motion", + "S": "Structural", + "E": "Electrical", + "M": "Mechanical", + "T": "Tooling", + "A": "Assemblies", + "P": "Purchased", + "X": "Custom Fabricated", +} + +# Default sourcing type by category prefix +_DEFAULT_SOURCING = { + "A": "M", # assemblies are manufactured + "X": "M", # custom fab is manufactured + "T": "M", # tooling is manufactured +} + + +def _get_categories( + client: SiloClient, schema: str = "kindred-rd" +) -> List[Tuple[str, str]]: + """Fetch category codes and descriptions from the schema. + + Returns list of (code, description) tuples sorted by code. + """ + try: + schema_data = client.get_schema(schema) + segments = schema_data.get("segments", []) + cat_segment = None + for seg in segments: + if seg.get("name") == "category": + cat_segment = seg + break + if cat_segment and cat_segment.get("values"): + return sorted(cat_segment["values"].items()) + except RuntimeError: + pass + return [] + + +def _get_category_properties( + client: SiloClient, category: str, schema: str = "kindred-rd" +) -> List[str]: + """Fetch property field names relevant to a category. + + Returns the list of property keys that apply to the category's prefix group. + """ + try: + prop_schema = client.get_property_schema(schema) + # prop_schema has global defaults and category-specific overrides + defaults = prop_schema.get("defaults", {}) + category_props = prop_schema.get("categories", {}).get(category[:1], {}) + # Merge: category-specific fields + global defaults + all_keys = set(defaults.keys()) + all_keys.update(category_props.keys()) + return sorted(all_keys) + except RuntimeError: + return list(sf.PROPERTY_KEY_MAP.values()) + + +# --------------------------------------------------------------------------- +# Wizard dialog (UNO) +# --------------------------------------------------------------------------- + + +def run_completion_wizard( + client: SiloClient, + doc, + sheet, + insert_row: int, + project_code: str = "", + schema: str = "kindred-rd", +) -> bool: + """Run the item completion wizard. Returns True if a row was inserted. + + Parameters + ---------- + client : SiloClient + doc : XSpreadsheetDocument + sheet : XSpreadsheet + insert_row : int (0-based row index to insert at) + project_code : str (for auto-tagging) + schema : str + """ + if not _HAS_UNO: + return False + + ctx = uno.getComponentContext() + smgr = ctx.ServiceManager + + # -- Step 1: Category selection ----------------------------------------- + categories = _get_categories(client, schema) + if not categories: + dialogs._msgbox( + None, + "Add Item", + "Could not fetch categories from server.", + box_type="errorbox", + ) + return False + + # Build display list grouped by prefix + cat_display = [] + for code, desc in categories: + prefix = code[0] if code else "?" + group = _PREFIX_GROUPS.get(prefix, "Other") + cat_display.append(f"{code} - {desc} [{group}]") + + # Use a simple input box with the category list as hint + # (A proper ListBox dialog would be more polished but this is functional) + cat_hint = ", ".join(c[0] for c in categories[:20]) + if len(categories) > 20: + cat_hint += f"... ({len(categories)} total)" + + category_input = dialogs._input_box( + "Add Item - Step 1/3", + f"Category code ({cat_hint}):", + ) + if not category_input: + return False + category = category_input.strip().upper() + + # Validate category + valid_codes = {c[0] for c in categories} + if category not in valid_codes: + dialogs._msgbox( + None, + "Add Item", + f"Unknown category: {category}", + box_type="errorbox", + ) + return False + + # -- Step 2: Required fields -------------------------------------------- + description = dialogs._input_box( + "Add Item - Step 2/3", + "Description (required, leave blank to use AI):", + ) + + # If blank and AI is configured, offer AI generation from seller description + if (not description or not description.strip()) and _ai.is_configured(): + seller_desc = dialogs._input_box( + "Add Item - AI Description", + "Paste the seller description for AI generation:", + ) + if seller_desc and seller_desc.strip(): + try: + ai_desc = _ai.generate_description( + seller_description=seller_desc.strip(), + category=category, + ) + accepted = dialogs.show_ai_description_dialog( + seller_desc.strip(), ai_desc + ) + if accepted: + description = accepted + except RuntimeError as e: + dialogs._msgbox( + None, + "AI Description Failed", + str(e), + box_type="errorbox", + ) + + if not description or not description.strip(): + dialogs._msgbox( + None, "Add Item", "Description is required.", box_type="errorbox" + ) + return False + + manual_pn = dialogs._input_box( + "Add Item - Step 2/3", + "Part number (leave blank for auto-generation):", + ) + + # Check for PN conflict if user entered one + use_existing_item = None + if manual_pn and manual_pn.strip(): + manual_pn = manual_pn.strip() + try: + existing = client.get_item(manual_pn) + # PN exists -- show conflict dialog + result = dialogs.show_pn_conflict_dialog(manual_pn, existing) + if result == dialogs.PN_USE_EXISTING: + use_existing_item = existing + elif result == dialogs.PN_CREATE_NEW: + manual_pn = "" # will auto-generate + else: + return False # cancelled + except RuntimeError: + pass # PN doesn't exist, which is fine + + # -- Step 3: Common fields ---------------------------------------------- + prefix = category[0] if category else "" + default_source = _DEFAULT_SOURCING.get(prefix, "P") + + source = dialogs._input_box( + "Add Item - Step 3/3", + f"Sourcing type (M=manufactured, P=purchased) [default: {default_source}]:", + default=default_source, + ) + if source is None: + return False + source = source.strip().upper() or default_source + + unit_cost_str = dialogs._input_box( + "Add Item - Step 3/3", + "Unit cost (e.g. 10.50):", + default="0", + ) + unit_cost = 0.0 + if unit_cost_str: + try: + unit_cost = float(unit_cost_str.strip().replace("$", "").replace(",", "")) + except ValueError: + pass + + qty_str = dialogs._input_box( + "Add Item - Step 3/3", + "Quantity [default: 1]:", + default="1", + ) + qty = 1.0 + if qty_str: + try: + qty = float(qty_str.strip()) + except ValueError: + pass + + sourcing_link = ( + dialogs._input_box( + "Add Item - Step 3/3", + "Sourcing link (URL, optional):", + ) + or "" + ) + + # -- Create item or use existing ---------------------------------------- + created_item = None + if use_existing_item: + # Use the existing item's data + created_item = use_existing_item + final_pn = use_existing_item.get("part_number", manual_pn) + elif manual_pn: + # Create with the user's manual PN + try: + created_item = client.create_item( + schema=schema, + category=category, + description=description.strip(), + projects=[project_code] if project_code else None, + sourcing_type=source, + sourcing_link=sourcing_link.strip(), + standard_cost=unit_cost if unit_cost else None, + ) + final_pn = created_item.get("part_number", manual_pn) + except RuntimeError as e: + dialogs._msgbox(None, "Add Item Failed", str(e), box_type="errorbox") + return False + else: + # Auto-generate PN + try: + created_item = client.create_item( + schema=schema, + category=category, + description=description.strip(), + projects=[project_code] if project_code else None, + sourcing_type=source, + sourcing_link=sourcing_link.strip(), + standard_cost=unit_cost if unit_cost else None, + ) + final_pn = created_item.get("part_number", "") + except RuntimeError as e: + dialogs._msgbox(None, "Add Item Failed", str(e), box_type="errorbox") + return False + + if not final_pn: + dialogs._msgbox( + None, "Add Item", "No part number returned.", box_type="errorbox" + ) + return False + + # Auto-tag with project if needed + if project_code and created_item and not use_existing_item: + try: + client.add_item_projects(final_pn, [project_code]) + except RuntimeError: + pass + + # -- Insert row into sheet ---------------------------------------------- + _insert_bom_row( + sheet, + insert_row, + pn=final_pn, + description=created_item.get("description", description.strip()) + if created_item + else description.strip(), + unit_cost=unit_cost, + qty=qty, + sourcing_link=sourcing_link.strip(), + schema=schema, + status=sync_engine.STATUS_NEW, + parent_pn="", + ) + + return True + + +def _insert_bom_row( + sheet, + row: int, + pn: str, + description: str, + source: str, + unit_cost: float, + qty: float, + sourcing_link: str, + schema: str, + status: str, + parent_pn: str, +): + """Write a single BOM row at the given position with sync tracking.""" + from . import pull as _pull # avoid circular import at module level + + _pull._set_cell_string(sheet, sf.COL_ITEM, row, "") + _pull._set_cell_string(sheet, sf.COL_LEVEL, row, "") + _pull._set_cell_string(sheet, sf.COL_SOURCE, row, source) + _pull._set_cell_string(sheet, sf.COL_PN, row, pn) + _pull._set_cell_string(sheet, sf.COL_DESCRIPTION, row, description) + _pull._set_cell_string(sheet, sf.COL_SELLER_DESC, row, "") + + if unit_cost: + _pull._set_cell_float(sheet, sf.COL_UNIT_COST, row, unit_cost) + _pull._set_cell_float(sheet, sf.COL_QTY, row, qty) + + # Ext Cost formula + ext_formula = f"={sf.col_letter(sf.COL_UNIT_COST)}{row + 1}*{sf.col_letter(sf.COL_QTY)}{row + 1}" + _pull._set_cell_formula(sheet, sf.COL_EXT_COST, row, ext_formula) + + _pull._set_cell_string(sheet, sf.COL_SOURCING_LINK, row, sourcing_link) + _pull._set_cell_string(sheet, sf.COL_SCHEMA, row, schema) + + # Build row cells for hash computation + row_cells = [""] * sf.BOM_TOTAL_COLS + row_cells[sf.COL_SOURCE] = source + row_cells[sf.COL_PN] = pn + row_cells[sf.COL_DESCRIPTION] = description + row_cells[sf.COL_UNIT_COST] = str(unit_cost) if unit_cost else "" + row_cells[sf.COL_QTY] = str(qty) + row_cells[sf.COL_SOURCING_LINK] = sourcing_link + row_cells[sf.COL_SCHEMA] = schema + + sync_engine.update_row_sync_state(row_cells, status, parent_pn=parent_pn) + _pull._set_cell_string(sheet, sf.COL_ROW_HASH, row, row_cells[sf.COL_ROW_HASH]) + _pull._set_cell_string(sheet, sf.COL_ROW_STATUS, row, row_cells[sf.COL_ROW_STATUS]) + _pull._set_cell_string(sheet, sf.COL_UPDATED_AT, row, row_cells[sf.COL_UPDATED_AT]) + _pull._set_cell_string(sheet, sf.COL_PARENT_PN, row, row_cells[sf.COL_PARENT_PN]) + + # Colour the row + color = _pull._STATUS_COLORS.get(status) + if color: + _pull._set_row_bg(sheet, row, sf.BOM_TOTAL_COLS, color) diff --git a/pythonpath/silo_calc/dialogs.py b/pythonpath/silo_calc/dialogs.py new file mode 100644 index 0000000..7028735 --- /dev/null +++ b/pythonpath/silo_calc/dialogs.py @@ -0,0 +1,667 @@ +"""UNO dialogs for the Silo Calc extension. + +Provides login, settings, push summary, and PN conflict resolution dialogs. +All dialogs use the UNO dialog toolkit (``com.sun.star.awt``). +""" + +from typing import Any, Dict, List, Optional, Tuple + +# UNO imports are only available inside LibreOffice +try: + import uno + + _HAS_UNO = True +except ImportError: + _HAS_UNO = False + +from . import settings as _settings +from .client import SiloClient + + +def _get_desktop(): + """Return the XSCRIPTCONTEXT desktop, or resolve via component context.""" + ctx = uno.getComponentContext() + smgr = ctx.ServiceManager + return smgr.createInstanceWithContext("com.sun.star.frame.Desktop", ctx) + + +def _msgbox(parent, title: str, message: str, box_type="infobox"): + """Show a simple message box.""" + if not _HAS_UNO: + return + ctx = uno.getComponentContext() + smgr = ctx.ServiceManager + toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx) + if parent is None: + parent = _get_desktop().getCurrentFrame().getContainerWindow() + mbt = uno.Enum( + "com.sun.star.awt.MessageBoxType", + "INFOBOX" if box_type == "infobox" else "ERRORBOX", + ) + msg_box = toolkit.createMessageBox(parent, mbt, 1, title, message) + msg_box.execute() + + +def _input_box( + title: str, label: str, default: str = "", password: bool = False +) -> Optional[str]: + """Show a simple single-field input dialog. Returns None on cancel.""" + if not _HAS_UNO: + return None + ctx = uno.getComponentContext() + smgr = ctx.ServiceManager + dlg_provider = smgr.createInstanceWithContext( + "com.sun.star.awt.DialogProvider", ctx + ) + + # Build dialog model programmatically + dlg_model = smgr.createInstanceWithContext( + "com.sun.star.awt.UnoControlDialogModel", ctx + ) + dlg_model.Width = 220 + dlg_model.Height = 80 + dlg_model.Title = title + + # Label + lbl = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") + lbl.Name = "lbl" + lbl.PositionX = 10 + lbl.PositionY = 10 + lbl.Width = 200 + lbl.Height = 12 + lbl.Label = label + dlg_model.insertByName("lbl", lbl) + + # Text field + tf = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") + tf.Name = "tf" + tf.PositionX = 10 + tf.PositionY = 24 + tf.Width = 200 + tf.Height = 14 + tf.Text = default + if password: + tf.EchoChar = ord("*") + dlg_model.insertByName("tf", tf) + + # OK button + btn_ok = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") + btn_ok.Name = "btn_ok" + btn_ok.PositionX = 110 + btn_ok.PositionY = 50 + btn_ok.Width = 45 + btn_ok.Height = 16 + btn_ok.Label = "OK" + btn_ok.PushButtonType = 1 # OK + dlg_model.insertByName("btn_ok", btn_ok) + + # Cancel button + btn_cancel = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") + btn_cancel.Name = "btn_cancel" + btn_cancel.PositionX = 160 + btn_cancel.PositionY = 50 + btn_cancel.Width = 45 + btn_cancel.Height = 16 + btn_cancel.Label = "Cancel" + btn_cancel.PushButtonType = 2 # CANCEL + dlg_model.insertByName("btn_cancel", btn_cancel) + + # Create dialog control + dlg = smgr.createInstanceWithContext("com.sun.star.awt.UnoControlDialog", ctx) + dlg.setModel(dlg_model) + dlg.setVisible(False) + toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx) + dlg.createPeer(toolkit, None) + + result = dlg.execute() + if result == 1: # OK + text = dlg.getControl("tf").getText() + dlg.dispose() + return text + dlg.dispose() + return None + + +# --------------------------------------------------------------------------- +# Login dialog +# --------------------------------------------------------------------------- + + +def show_login_dialog(parent=None) -> bool: + """Two-step login: username then password. Returns True on success.""" + username = _input_box("Silo Login", "Username:") + if not username: + return False + password = _input_box("Silo Login", f"Password for {username}:", password=True) + if not password: + return False + + client = SiloClient() + try: + result = client.login(username, password) + _msgbox( + parent, + "Silo Login", + f"Logged in as {result['username']} ({result.get('role', 'viewer')})", + ) + return True + except RuntimeError as e: + _msgbox(parent, "Silo Login Failed", str(e), box_type="errorbox") + return False + + +# --------------------------------------------------------------------------- +# Settings dialog +# --------------------------------------------------------------------------- + + +def show_settings_dialog(parent=None) -> bool: + """Show the settings dialog. Returns True if saved.""" + if not _HAS_UNO: + return False + + ctx = uno.getComponentContext() + smgr = ctx.ServiceManager + cfg = _settings.load() + + dlg_model = smgr.createInstanceWithContext( + "com.sun.star.awt.UnoControlDialogModel", ctx + ) + dlg_model.Width = 300 + dlg_model.Height = 200 + dlg_model.Title = "Silo Settings" + + fields = [ + ("API URL", "api_url", cfg.get("api_url", "")), + ("API Token", "api_token", cfg.get("api_token", "")), + ("SSL Cert Path", "ssl_cert_path", cfg.get("ssl_cert_path", "")), + ("Projects Dir", "projects_dir", cfg.get("projects_dir", "")), + ("Default Schema", "default_schema", cfg.get("default_schema", "kindred-rd")), + ] + + y = 10 + for label_text, name, default in fields: + lbl = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") + lbl.Name = f"lbl_{name}" + lbl.PositionX = 10 + lbl.PositionY = y + lbl.Width = 80 + lbl.Height = 12 + lbl.Label = label_text + dlg_model.insertByName(f"lbl_{name}", lbl) + + tf = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") + tf.Name = f"tf_{name}" + tf.PositionX = 95 + tf.PositionY = y + tf.Width = 195 + tf.Height = 14 + tf.Text = default + dlg_model.insertByName(f"tf_{name}", tf) + y += 22 + + # SSL verify checkbox + cb = dlg_model.createInstance("com.sun.star.awt.UnoControlCheckBoxModel") + cb.Name = "cb_ssl_verify" + cb.PositionX = 95 + cb.PositionY = y + cb.Width = 120 + cb.Height = 14 + cb.Label = "Verify SSL" + cb.State = 1 if cfg.get("ssl_verify", True) else 0 + dlg_model.insertByName("cb_ssl_verify", cb) + y += 22 + + # --- OpenRouter AI section --- + lbl_ai = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") + lbl_ai.Name = "lbl_ai_section" + lbl_ai.PositionX = 10 + lbl_ai.PositionY = y + lbl_ai.Width = 280 + lbl_ai.Height = 12 + lbl_ai.Label = "--- OpenRouter AI ---" + dlg_model.insertByName("lbl_ai_section", lbl_ai) + y += 16 + + # API Key (masked) + lbl_key = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") + lbl_key.Name = "lbl_openrouter_api_key" + lbl_key.PositionX = 10 + lbl_key.PositionY = y + lbl_key.Width = 80 + lbl_key.Height = 12 + lbl_key.Label = "API Key" + dlg_model.insertByName("lbl_openrouter_api_key", lbl_key) + + tf_key = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") + tf_key.Name = "tf_openrouter_api_key" + tf_key.PositionX = 95 + tf_key.PositionY = y + tf_key.Width = 195 + tf_key.Height = 14 + tf_key.Text = cfg.get("openrouter_api_key", "") + tf_key.EchoChar = ord("*") + dlg_model.insertByName("tf_openrouter_api_key", tf_key) + y += 22 + + # AI Model + lbl_model = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") + lbl_model.Name = "lbl_openrouter_model" + lbl_model.PositionX = 10 + lbl_model.PositionY = y + lbl_model.Width = 80 + lbl_model.Height = 12 + lbl_model.Label = "AI Model" + dlg_model.insertByName("lbl_openrouter_model", lbl_model) + + tf_model = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") + tf_model.Name = "tf_openrouter_model" + tf_model.PositionX = 95 + tf_model.PositionY = y + tf_model.Width = 195 + tf_model.Height = 14 + tf_model.Text = cfg.get("openrouter_model", "") + tf_model.HelpText = "openai/gpt-4.1-nano" + dlg_model.insertByName("tf_openrouter_model", tf_model) + y += 22 + + # AI Instructions (multi-line) + lbl_instr = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") + lbl_instr.Name = "lbl_openrouter_instructions" + lbl_instr.PositionX = 10 + lbl_instr.PositionY = y + lbl_instr.Width = 80 + lbl_instr.Height = 12 + lbl_instr.Label = "AI Instructions" + dlg_model.insertByName("lbl_openrouter_instructions", lbl_instr) + + tf_instr = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") + tf_instr.Name = "tf_openrouter_instructions" + tf_instr.PositionX = 95 + tf_instr.PositionY = y + tf_instr.Width = 195 + tf_instr.Height = 56 + tf_instr.Text = cfg.get("openrouter_instructions", "") + tf_instr.MultiLine = True + tf_instr.VScroll = True + tf_instr.HelpText = "Custom system prompt (leave blank for default)" + dlg_model.insertByName("tf_openrouter_instructions", tf_instr) + y += 62 + + # Test connection button + btn_test = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") + btn_test.Name = "btn_test" + btn_test.PositionX = 10 + btn_test.PositionY = y + btn_test.Width = 80 + btn_test.Height = 16 + btn_test.Label = "Test Connection" + dlg_model.insertByName("btn_test", btn_test) + + # Status label + lbl_status = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") + lbl_status.Name = "lbl_status" + lbl_status.PositionX = 95 + lbl_status.PositionY = y + 2 + lbl_status.Width = 195 + lbl_status.Height = 12 + lbl_status.Label = "" + dlg_model.insertByName("lbl_status", lbl_status) + y += 22 + + # OK / Cancel + btn_ok = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") + btn_ok.Name = "btn_ok" + btn_ok.PositionX = 190 + btn_ok.PositionY = y + btn_ok.Width = 45 + btn_ok.Height = 16 + btn_ok.Label = "Save" + btn_ok.PushButtonType = 1 + dlg_model.insertByName("btn_ok", btn_ok) + + btn_cancel = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") + btn_cancel.Name = "btn_cancel" + btn_cancel.PositionX = 240 + btn_cancel.PositionY = y + btn_cancel.Width = 45 + btn_cancel.Height = 16 + btn_cancel.Label = "Cancel" + btn_cancel.PushButtonType = 2 + dlg_model.insertByName("btn_cancel", btn_cancel) + + dlg_model.Height = y + 26 + + dlg = smgr.createInstanceWithContext("com.sun.star.awt.UnoControlDialog", ctx) + dlg.setModel(dlg_model) + dlg.setVisible(False) + toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx) + dlg.createPeer(toolkit, None) + + result = dlg.execute() + if result == 1: + for _, name, _ in fields: + cfg[name] = dlg.getControl(f"tf_{name}").getText() + cfg["ssl_verify"] = bool(dlg.getControl("cb_ssl_verify").getModel().State) + cfg["openrouter_api_key"] = dlg.getControl("tf_openrouter_api_key").getText() + cfg["openrouter_model"] = dlg.getControl("tf_openrouter_model").getText() + cfg["openrouter_instructions"] = dlg.getControl( + "tf_openrouter_instructions" + ).getText() + _settings.save(cfg) + dlg.dispose() + return True + + dlg.dispose() + return False + + +# --------------------------------------------------------------------------- +# Push summary dialog +# --------------------------------------------------------------------------- + + +def show_push_summary( + new_count: int, + modified_count: int, + conflict_count: int, + unchanged_count: int, + parent=None, +) -> bool: + """Show push summary and return True if user confirms.""" + lines = [ + f"New items: {new_count}", + f"Modified items: {modified_count}", + f"Conflicts: {conflict_count}", + f"Unchanged: {unchanged_count}", + ] + if conflict_count: + lines.append("\nConflicts must be resolved before pushing.") + + msg = "\n".join(lines) + if conflict_count: + _msgbox(parent, "Silo Push -- Conflicts Found", msg, box_type="errorbox") + return False + + if new_count == 0 and modified_count == 0: + _msgbox(parent, "Silo Push", "Nothing to push -- all rows are up to date.") + return False + + # Confirmation -- for now use a simple info box (OK = proceed) + _msgbox(parent, "Silo Push", f"Ready to push:\n\n{msg}\n\nProceed?") + return True + + +# --------------------------------------------------------------------------- +# PN Conflict Resolution dialog +# --------------------------------------------------------------------------- + +# Return values +PN_USE_EXISTING = "use_existing" +PN_CREATE_NEW = "create_new" +PN_CANCEL = "cancel" + + +def show_pn_conflict_dialog( + part_number: str, + existing_item: Dict[str, Any], + parent=None, +) -> str: + """Show PN conflict dialog when a manually entered PN already exists. + + Returns one of: PN_USE_EXISTING, PN_CREATE_NEW, PN_CANCEL. + """ + if not _HAS_UNO: + return PN_CANCEL + + ctx = uno.getComponentContext() + smgr = ctx.ServiceManager + + dlg_model = smgr.createInstanceWithContext( + "com.sun.star.awt.UnoControlDialogModel", ctx + ) + dlg_model.Width = 320 + dlg_model.Height = 220 + dlg_model.Title = f"Part Number Conflict: {part_number}" + + y = 10 + info_lines = [ + "This part number already exists in Silo:", + "", + f" Description: {existing_item.get('description', '')}", + f" Type: {existing_item.get('item_type', '')}", + f" Category: {existing_item.get('part_number', '')[:3]}", + f" Sourcing: {existing_item.get('sourcing_type', '')}", + f" Cost: ${existing_item.get('standard_cost', 0):.2f}", + ] + + for line in info_lines: + lbl = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") + lbl.Name = f"info_{y}" + lbl.PositionX = 10 + lbl.PositionY = y + lbl.Width = 300 + lbl.Height = 12 + lbl.Label = line + dlg_model.insertByName(f"info_{y}", lbl) + y += 13 + + y += 5 + + # Radio buttons + rb_use = dlg_model.createInstance("com.sun.star.awt.UnoControlRadioButtonModel") + rb_use.Name = "rb_use" + rb_use.PositionX = 20 + rb_use.PositionY = y + rb_use.Width = 280 + rb_use.Height = 14 + rb_use.Label = "Use existing item (add to BOM)" + rb_use.State = 1 # selected by default + dlg_model.insertByName("rb_use", rb_use) + y += 18 + + rb_new = dlg_model.createInstance("com.sun.star.awt.UnoControlRadioButtonModel") + rb_new.Name = "rb_new" + rb_new.PositionX = 20 + rb_new.PositionY = y + rb_new.Width = 280 + rb_new.Height = 14 + rb_new.Label = "Create new item (auto-generate PN)" + rb_new.State = 0 + dlg_model.insertByName("rb_new", rb_new) + y += 18 + + rb_cancel = dlg_model.createInstance("com.sun.star.awt.UnoControlRadioButtonModel") + rb_cancel.Name = "rb_cancel" + rb_cancel.PositionX = 20 + rb_cancel.PositionY = y + rb_cancel.Width = 280 + rb_cancel.Height = 14 + rb_cancel.Label = "Cancel" + rb_cancel.State = 0 + dlg_model.insertByName("rb_cancel", rb_cancel) + y += 25 + + # OK button + btn_ok = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") + btn_ok.Name = "btn_ok" + btn_ok.PositionX = 210 + btn_ok.PositionY = y + btn_ok.Width = 45 + btn_ok.Height = 16 + btn_ok.Label = "OK" + btn_ok.PushButtonType = 1 + dlg_model.insertByName("btn_ok", btn_ok) + + btn_cancel_btn = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") + btn_cancel_btn.Name = "btn_cancel_btn" + btn_cancel_btn.PositionX = 260 + btn_cancel_btn.PositionY = y + btn_cancel_btn.Width = 45 + btn_cancel_btn.Height = 16 + btn_cancel_btn.Label = "Cancel" + btn_cancel_btn.PushButtonType = 2 + dlg_model.insertByName("btn_cancel_btn", btn_cancel_btn) + + dlg_model.Height = y + 26 + + dlg = smgr.createInstanceWithContext("com.sun.star.awt.UnoControlDialog", ctx) + dlg.setModel(dlg_model) + dlg.setVisible(False) + toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx) + dlg.createPeer(toolkit, None) + + result = dlg.execute() + if result != 1: + dlg.dispose() + return PN_CANCEL + + if dlg.getControl("rb_use").getModel().State: + dlg.dispose() + return PN_USE_EXISTING + if dlg.getControl("rb_new").getModel().State: + dlg.dispose() + return PN_CREATE_NEW + + dlg.dispose() + return PN_CANCEL + + +# --------------------------------------------------------------------------- +# AI Description review dialog +# --------------------------------------------------------------------------- + + +def show_ai_description_dialog( + seller_description: str, ai_description: str, parent=None +) -> Optional[str]: + """Show AI-generated description for review/editing. + + Side-by-side layout: seller description (read-only) on the left, + AI-generated description (editable) on the right. + + Returns the accepted/edited description text, or None on cancel. + """ + if not _HAS_UNO: + return None + + ctx = uno.getComponentContext() + smgr = ctx.ServiceManager + + dlg_model = smgr.createInstanceWithContext( + "com.sun.star.awt.UnoControlDialogModel", ctx + ) + dlg_model.Width = 400 + dlg_model.Height = 210 + dlg_model.Title = "AI Description Review" + + # Left: Seller Description (read-only) + lbl_seller = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") + lbl_seller.Name = "lbl_seller" + lbl_seller.PositionX = 10 + lbl_seller.PositionY = 8 + lbl_seller.Width = 185 + lbl_seller.Height = 12 + lbl_seller.Label = "Seller Description" + dlg_model.insertByName("lbl_seller", lbl_seller) + + tf_seller = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") + tf_seller.Name = "tf_seller" + tf_seller.PositionX = 10 + tf_seller.PositionY = 22 + tf_seller.Width = 185 + tf_seller.Height = 140 + tf_seller.Text = seller_description + tf_seller.MultiLine = True + tf_seller.VScroll = True + tf_seller.ReadOnly = True + dlg_model.insertByName("tf_seller", tf_seller) + + # Right: Generated Description (editable) + lbl_gen = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel") + lbl_gen.Name = "lbl_gen" + lbl_gen.PositionX = 205 + lbl_gen.PositionY = 8 + lbl_gen.Width = 185 + lbl_gen.Height = 12 + lbl_gen.Label = "Generated Description (editable)" + dlg_model.insertByName("lbl_gen", lbl_gen) + + tf_gen = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel") + tf_gen.Name = "tf_gen" + tf_gen.PositionX = 205 + tf_gen.PositionY = 22 + tf_gen.Width = 185 + tf_gen.Height = 140 + tf_gen.Text = ai_description + tf_gen.MultiLine = True + tf_gen.VScroll = True + dlg_model.insertByName("tf_gen", tf_gen) + + # Accept button + btn_ok = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") + btn_ok.Name = "btn_ok" + btn_ok.PositionX = 290 + btn_ok.PositionY = 175 + btn_ok.Width = 50 + btn_ok.Height = 18 + btn_ok.Label = "Accept" + btn_ok.PushButtonType = 1 # OK + dlg_model.insertByName("btn_ok", btn_ok) + + # Cancel button + btn_cancel = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel") + btn_cancel.Name = "btn_cancel" + btn_cancel.PositionX = 345 + btn_cancel.PositionY = 175 + btn_cancel.Width = 45 + btn_cancel.Height = 18 + btn_cancel.Label = "Cancel" + btn_cancel.PushButtonType = 2 # CANCEL + dlg_model.insertByName("btn_cancel", btn_cancel) + + dlg = smgr.createInstanceWithContext("com.sun.star.awt.UnoControlDialog", ctx) + dlg.setModel(dlg_model) + dlg.setVisible(False) + toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx) + dlg.createPeer(toolkit, None) + + result = dlg.execute() + if result == 1: # OK / Accept + text = dlg.getControl("tf_gen").getText() + dlg.dispose() + return text + + dlg.dispose() + return None + + +# --------------------------------------------------------------------------- +# Assembly / Project picker dialogs +# --------------------------------------------------------------------------- + + +def show_assembly_picker(client: SiloClient, parent=None) -> Optional[str]: + """Show a dialog to pick an assembly by PN. Returns the PN or None.""" + pn = _input_box("Pull BOM", "Assembly part number (e.g. A01-0003):") + return pn if pn and pn.strip() else None + + +def show_project_picker(client: SiloClient, parent=None) -> Optional[str]: + """Show a dialog to pick a project code. Returns the code or None.""" + try: + projects = client.get_projects() + except RuntimeError: + projects = [] + + if not projects: + code = _input_box("Pull Project", "Project code:") + return code if code and code.strip() else None + + # Build a choice list + choices = [f"{p.get('code', '')} - {p.get('name', '')}" for p in projects] + # For simplicity, use an input box with hint. A proper list picker + # would use a ListBox control, but this is functional for now. + hint = "Available: " + ", ".join(p.get("code", "") for p in projects) + code = _input_box("Pull Project", f"Project code ({hint}):") + return code if code and code.strip() else None diff --git a/pythonpath/silo_calc/project_files.py b/pythonpath/silo_calc/project_files.py new file mode 100644 index 0000000..97bfdce --- /dev/null +++ b/pythonpath/silo_calc/project_files.py @@ -0,0 +1,76 @@ +"""Local project file management for ODS workbooks. + +Mirrors the FreeCAD file path pattern from ``pkg/freecad/silo_commands.py``. +Project ODS files live at:: + + ~/projects/sheets/{PROJECT_CODE}/{PROJECT_CODE}.ods + +The ``SILO_PROJECTS_DIR`` env var (shared with the FreeCAD workbench) +controls the base directory. +""" + +import os +from pathlib import Path +from typing import Optional + +from . import settings as _settings + + +def get_sheets_dir() -> Path: + """Return the base directory for ODS project sheets.""" + return _settings.get_projects_dir() / "sheets" + + +def get_project_sheet_path(project_code: str) -> Path: + """Canonical path for a project workbook. + + Example: ``~/projects/sheets/3DX10/3DX10.ods`` + """ + return get_sheets_dir() / project_code / f"{project_code}.ods" + + +def ensure_project_dir(project_code: str) -> Path: + """Create the project sheet directory if needed and return its path.""" + d = get_sheets_dir() / project_code + d.mkdir(parents=True, exist_ok=True) + return d + + +def project_sheet_exists(project_code: str) -> bool: + """Check whether a project workbook already exists locally.""" + return get_project_sheet_path(project_code).is_file() + + +def save_project_sheet(project_code: str, ods_bytes: bytes) -> Path: + """Write ODS bytes to the canonical project path. + + Returns the Path written to. + """ + ensure_project_dir(project_code) + path = get_project_sheet_path(project_code) + with open(path, "wb") as f: + f.write(ods_bytes) + return path + + +def read_project_sheet(project_code: str) -> Optional[bytes]: + """Read ODS bytes from the canonical project path, or None.""" + path = get_project_sheet_path(project_code) + if not path.is_file(): + return None + with open(path, "rb") as f: + return f.read() + + +def list_project_sheets() -> list: + """Return a list of (project_code, path) tuples for all local sheets.""" + sheets_dir = get_sheets_dir() + results = [] + if not sheets_dir.is_dir(): + return results + for entry in sorted(sheets_dir.iterdir()): + if entry.is_dir(): + ods = entry / f"{entry.name}.ods" + if ods.is_file(): + results.append((entry.name, ods)) + return results diff --git a/pythonpath/silo_calc/pull.py b/pythonpath/silo_calc/pull.py new file mode 100644 index 0000000..aa5bdc5 --- /dev/null +++ b/pythonpath/silo_calc/pull.py @@ -0,0 +1,542 @@ +"""Pull commands -- populate LibreOffice Calc sheets from Silo API data. + +This module handles the UNO cell-level work for SiloPullBOM and +SiloPullProject. It fetches data via the SiloClient, then writes +cells with proper formatting, formulas, hidden columns, and row +hash tracking. +""" + +from typing import Any, Dict, List, Optional + +from . import sheet_format as sf +from . import sync_engine +from .client import SiloClient + +# UNO imports -- only available inside LibreOffice +try: + import uno + from com.sun.star.beans import PropertyValue + from com.sun.star.table import CellHoriJustify + + _HAS_UNO = True +except ImportError: + _HAS_UNO = False + +# --------------------------------------------------------------------------- +# Colour helpers (UNO uses 0xRRGGBB integers) +# --------------------------------------------------------------------------- + + +def _rgb_int(r: int, g: int, b: int) -> int: + return (r << 16) | (g << 8) | b + + +_HEADER_BG = _rgb_int(68, 114, 196) # steel blue +_HEADER_FG = _rgb_int(255, 255, 255) # white text + +_STATUS_COLORS = {k: _rgb_int(*v) for k, v in sf.STATUS_COLORS.items()} + +# --------------------------------------------------------------------------- +# Cell writing helpers +# --------------------------------------------------------------------------- + + +def _set_cell_string(sheet, col: int, row: int, value: str): + cell = sheet.getCellByPosition(col, row) + cell.setString(str(value) if value else "") + + +def _set_cell_float(sheet, col: int, row: int, value, fmt: str = ""): + cell = sheet.getCellByPosition(col, row) + try: + cell.setValue(float(value)) + except (ValueError, TypeError): + cell.setString(str(value) if value else "") + + +def _set_cell_formula(sheet, col: int, row: int, formula: str): + cell = sheet.getCellByPosition(col, row) + cell.setFormula(formula) + + +def _set_row_bg(sheet, row: int, col_count: int, color: int): + """Set background colour on an entire row.""" + rng = sheet.getCellRangeByPosition(0, row, col_count - 1, row) + rng.CellBackColor = color + + +def _format_header_row(sheet, col_count: int): + """Bold white text on blue background for row 0.""" + rng = sheet.getCellRangeByPosition(0, 0, col_count - 1, 0) + rng.CellBackColor = _HEADER_BG + rng.CharColor = _HEADER_FG + rng.CharWeight = 150 # com.sun.star.awt.FontWeight.BOLD + + +def _freeze_row(doc, row: int = 1): + """Freeze panes at the given row (default: freeze header).""" + ctrl = doc.getCurrentController() + ctrl.freezeAtPosition(0, row) + + +def _hide_columns(sheet, start_col: int, end_col: int): + """Hide a range of columns (inclusive).""" + cols = sheet.getColumns() + for i in range(start_col, end_col): + col = cols.getByIndex(i) + col.IsVisible = False + + +def _set_column_width(sheet, col: int, width_mm100: int): + """Set column width in 1/100 mm.""" + cols = sheet.getColumns() + c = cols.getByIndex(col) + c.Width = width_mm100 + + +# --------------------------------------------------------------------------- +# BOM data helpers +# --------------------------------------------------------------------------- + + +def _get_meta(entry: Dict, key: str, default: str = "") -> str: + """Extract a value from a BOM entry's metadata dict.""" + meta = entry.get("metadata") or {} + val = meta.get(key, default) + return str(val) if val else default + + +def _get_meta_float(entry: Dict, key: str) -> Optional[float]: + meta = entry.get("metadata") or {} + val = meta.get(key) + if val is not None: + try: + return float(val) + except (ValueError, TypeError): + pass + return None + + +def _get_property(rev: Optional[Dict], key: str) -> str: + """Extract a property from a revision's properties dict.""" + if not rev: + return "" + props = rev.get("properties") or {} + val = props.get(key, "") + return str(val) if val else "" + + +# --------------------------------------------------------------------------- +# SiloPullBOM +# --------------------------------------------------------------------------- + + +def pull_bom( + client: SiloClient, + doc, + sheet, + assembly_pn: str, + project_code: str = "", + schema: str = "kindred-rd", +): + """Fetch an expanded BOM and populate *sheet* with formatted data. + + Parameters + ---------- + client : SiloClient + doc : XSpreadsheetDocument + sheet : XSpreadsheet (the target sheet to populate) + assembly_pn : str (top-level assembly part number) + project_code : str (project code for auto-tagging, optional) + schema : str + """ + if not _HAS_UNO: + raise RuntimeError("UNO API not available -- must run inside LibreOffice") + + # Fetch expanded BOM + bom_entries = client.get_bom_expanded(assembly_pn, depth=10) + if not bom_entries: + raise RuntimeError(f"No BOM entries found for {assembly_pn}") + + # Fetch the top-level item for the assembly name + try: + top_item = client.get_item(assembly_pn) + except RuntimeError: + top_item = {} + + # Build a cache of items and their latest revisions for property lookup + item_cache: Dict[str, Dict] = {} + rev_cache: Dict[str, Dict] = {} + + def _ensure_cached(pn: str): + if pn in item_cache: + return + try: + item_cache[pn] = client.get_item(pn) + except RuntimeError: + item_cache[pn] = {} + try: + revisions = client.get_revisions(pn) + if revisions: + rev_cache[pn] = revisions[0] # newest first + except RuntimeError: + pass + + # Pre-cache all items in the BOM + all_pns = set() + for e in bom_entries: + all_pns.add(e.get("child_part_number", "")) + all_pns.add(e.get("parent_part_number", "")) + all_pns.discard("") + for pn in all_pns: + _ensure_cached(pn) + + # -- Write header row --------------------------------------------------- + for col_idx, header in enumerate(sf.BOM_ALL_HEADERS): + _set_cell_string(sheet, col_idx, 0, header) + _format_header_row(sheet, sf.BOM_TOTAL_COLS) + + # -- Group entries by parent for section headers ------------------------ + # BOM entries come back in tree order (parent then children). + # We insert section header rows for each depth-1 sub-assembly. + + row = 1 # current write row (0 is header) + prev_parent = None + + for entry in bom_entries: + depth = entry.get("depth", 0) + child_pn = entry.get("child_part_number", "") + parent_pn = entry.get("parent_part_number", "") + child_item = item_cache.get(child_pn, {}) + child_rev = rev_cache.get(child_pn) + + # Section header: when the parent changes for depth >= 1 entries + if depth == 1 and parent_pn != prev_parent and parent_pn: + if row > 1: + # Blank separator row + row += 1 + # Sub-assembly label row + parent_item = item_cache.get(parent_pn, {}) + label = parent_item.get("description", parent_pn) + _set_cell_string(sheet, sf.COL_ITEM, row, label) + _set_cell_float(sheet, sf.COL_LEVEL, row, 0) + _set_cell_string(sheet, sf.COL_SOURCE, row, "M") + _set_cell_string(sheet, sf.COL_PN, row, parent_pn) + + # Compute sub-assembly cost from children if available + parent_cost = _compute_subassembly_cost(bom_entries, parent_pn, item_cache) + if parent_cost is not None: + _set_cell_float(sheet, sf.COL_UNIT_COST, row, parent_cost) + _set_cell_float(sheet, sf.COL_QTY, row, 1) + # Ext Cost formula + ext_formula = f"={sf.col_letter(sf.COL_UNIT_COST)}{row + 1}*{sf.col_letter(sf.COL_QTY)}{row + 1}" + _set_cell_formula(sheet, sf.COL_EXT_COST, row, ext_formula) + _set_cell_string(sheet, sf.COL_SCHEMA, row, schema) + + # Sync tracking for parent row + parent_cells = [""] * sf.BOM_TOTAL_COLS + parent_cells[sf.COL_ITEM] = label + parent_cells[sf.COL_LEVEL] = "0" + parent_cells[sf.COL_SOURCE] = "M" + parent_cells[sf.COL_PN] = parent_pn + parent_cells[sf.COL_SCHEMA] = schema + sync_engine.update_row_sync_state( + parent_cells, + sync_engine.STATUS_SYNCED, + updated_at=parent_item.get("updated_at", ""), + parent_pn="", + ) + _set_cell_string(sheet, sf.COL_ROW_HASH, row, parent_cells[sf.COL_ROW_HASH]) + _set_cell_string( + sheet, sf.COL_ROW_STATUS, row, parent_cells[sf.COL_ROW_STATUS] + ) + _set_cell_string( + sheet, sf.COL_UPDATED_AT, row, parent_cells[sf.COL_UPDATED_AT] + ) + + _set_row_bg(sheet, row, sf.BOM_TOTAL_COLS, _STATUS_COLORS["synced"]) + prev_parent = parent_pn + row += 1 + + # -- Write child row ----------------------------------------------- + quantity = entry.get("quantity") + unit_cost = _get_meta_float(entry, "unit_cost") + if unit_cost is None: + unit_cost = child_item.get("standard_cost") + + # Item column: blank for children (name is in the section header) + _set_cell_string(sheet, sf.COL_ITEM, row, "") + _set_cell_float(sheet, sf.COL_LEVEL, row, depth) + _set_cell_string(sheet, sf.COL_SOURCE, row, child_item.get("sourcing_type", "")) + _set_cell_string(sheet, sf.COL_PN, row, child_pn) + _set_cell_string( + sheet, sf.COL_DESCRIPTION, row, child_item.get("description", "") + ) + _set_cell_string( + sheet, sf.COL_SELLER_DESC, row, _get_meta(entry, "seller_description") + ) + + if unit_cost is not None: + _set_cell_float(sheet, sf.COL_UNIT_COST, row, unit_cost) + if quantity is not None: + _set_cell_float(sheet, sf.COL_QTY, row, quantity) + + # Ext Cost formula + ext_formula = f"={sf.col_letter(sf.COL_UNIT_COST)}{row + 1}*{sf.col_letter(sf.COL_QTY)}{row + 1}" + _set_cell_formula(sheet, sf.COL_EXT_COST, row, ext_formula) + + _set_cell_string( + sheet, sf.COL_SOURCING_LINK, row, child_item.get("sourcing_link", "") + ) + _set_cell_string(sheet, sf.COL_SCHEMA, row, schema) + + # -- Property columns ----------------------------------------------- + prop_values = _build_property_cells(child_item, child_rev, entry) + for i, val in enumerate(prop_values): + if val: + _set_cell_string(sheet, sf.COL_PROP_START + i, row, val) + + # -- Sync tracking --------------------------------------------------- + row_cells = [""] * sf.BOM_TOTAL_COLS + row_cells[sf.COL_LEVEL] = str(depth) + row_cells[sf.COL_SOURCE] = child_item.get("sourcing_type", "") + row_cells[sf.COL_PN] = child_pn + row_cells[sf.COL_DESCRIPTION] = child_item.get("description", "") + row_cells[sf.COL_SELLER_DESC] = _get_meta(entry, "seller_description") + row_cells[sf.COL_UNIT_COST] = str(unit_cost) if unit_cost else "" + row_cells[sf.COL_QTY] = str(quantity) if quantity else "" + row_cells[sf.COL_SOURCING_LINK] = child_item.get("sourcing_link", "") + row_cells[sf.COL_SCHEMA] = schema + for i, val in enumerate(prop_values): + row_cells[sf.COL_PROP_START + i] = val + + sync_engine.update_row_sync_state( + row_cells, + sync_engine.STATUS_SYNCED, + updated_at=child_item.get("updated_at", ""), + parent_pn=parent_pn, + ) + _set_cell_string(sheet, sf.COL_ROW_HASH, row, row_cells[sf.COL_ROW_HASH]) + _set_cell_string(sheet, sf.COL_ROW_STATUS, row, row_cells[sf.COL_ROW_STATUS]) + _set_cell_string(sheet, sf.COL_UPDATED_AT, row, row_cells[sf.COL_UPDATED_AT]) + _set_cell_string(sheet, sf.COL_PARENT_PN, row, row_cells[sf.COL_PARENT_PN]) + + _set_row_bg(sheet, row, sf.BOM_TOTAL_COLS, _STATUS_COLORS["synced"]) + row += 1 + + # -- Formatting --------------------------------------------------------- + _freeze_row(doc, 1) + _hide_columns(sheet, sf.COL_PROP_START, sf.COL_PROP_END) # property cols + _hide_columns(sheet, sf.COL_SYNC_START, sf.BOM_TOTAL_COLS) # sync cols + + # Set reasonable column widths for visible columns (in 1/100 mm) + _WIDTHS = { + sf.COL_ITEM: 4500, + sf.COL_LEVEL: 1200, + sf.COL_SOURCE: 1500, + sf.COL_PN: 2500, + sf.COL_DESCRIPTION: 5000, + sf.COL_SELLER_DESC: 6000, + sf.COL_UNIT_COST: 2200, + sf.COL_QTY: 1200, + sf.COL_EXT_COST: 2200, + sf.COL_SOURCING_LINK: 5000, + sf.COL_SCHEMA: 1500, + } + for col, width in _WIDTHS.items(): + _set_column_width(sheet, col, width) + + # Auto-tag all items with the project (if a project code is set) + if project_code: + _auto_tag_project(client, all_pns, project_code) + + return row - 1 # number of data rows written + + +def _compute_subassembly_cost( + bom_entries: List[Dict], + parent_pn: str, + item_cache: Dict[str, Dict], +) -> Optional[float]: + """Sum unit_cost * quantity for direct children of parent_pn.""" + total = 0.0 + found = False + for e in bom_entries: + if e.get("parent_part_number") == parent_pn and e.get("depth", 0) > 0: + q = e.get("quantity") or 0 + uc = _get_meta_float(e, "unit_cost") + if uc is None: + child = item_cache.get(e.get("child_part_number", ""), {}) + uc = child.get("standard_cost") + if uc is not None: + total += float(uc) * float(q) + found = True + return total if found else None + + +def _build_property_cells( + item: Dict, rev: Optional[Dict], bom_entry: Dict +) -> List[str]: + """Build the property column values in order matching BOM_PROPERTY_HEADERS. + + Sources (priority): revision properties > BOM metadata > item fields. + """ + result = [] + for header in sf.BOM_PROPERTY_HEADERS: + db_key = sf.PROPERTY_KEY_MAP.get(header, "") + val = "" + # Check revision properties first + if db_key: + val = _get_property(rev, db_key) + # Fallback to BOM entry metadata + if not val and db_key: + val = _get_meta(bom_entry, db_key) + # Special case: Long Description from item field + if header == "Long Description" and not val: + val = item.get("long_description", "") + # Special case: Notes from item metadata or revision + if header == "Notes" and not val: + val = _get_meta(bom_entry, "notes") + result.append(str(val) if val else "") + return result + + +def _auto_tag_project( + client: SiloClient, + part_numbers: set, + project_code: str, +): + """Tag all part numbers with the given project code (skip failures).""" + for pn in part_numbers: + if not pn: + continue + try: + existing = client.get_item_projects(pn) + existing_codes = ( + {p.get("code", "") for p in existing} + if isinstance(existing, list) + else set() + ) + if project_code not in existing_codes: + client.add_item_projects(pn, [project_code]) + except RuntimeError: + pass # Best-effort tagging + + +# --------------------------------------------------------------------------- +# SiloPullProject +# --------------------------------------------------------------------------- + + +def pull_project( + client: SiloClient, + doc, + project_code: str, + schema: str = "kindred-rd", +): + """Fetch project items and populate an Items sheet. + + Also attempts to find an assembly and populate a BOM sheet. + """ + if not _HAS_UNO: + raise RuntimeError("UNO API not available") + + items = client.get_project_items(project_code) + if not items: + raise RuntimeError(f"No items found for project {project_code}") + + sheets = doc.getSheets() + + # -- Items sheet -------------------------------------------------------- + if sheets.hasByName("Items"): + items_sheet = sheets.getByName("Items") + else: + sheets.insertNewByName("Items", sheets.getCount()) + items_sheet = sheets.getByName("Items") + + # Header + for col_idx, header in enumerate(sf.ITEMS_HEADERS): + _set_cell_string(items_sheet, col_idx, 0, header) + header_range = items_sheet.getCellRangeByPosition( + 0, 0, len(sf.ITEMS_HEADERS) - 1, 0 + ) + header_range.CellBackColor = _HEADER_BG + header_range.CharColor = _HEADER_FG + header_range.CharWeight = 150 + + for row_idx, item in enumerate(items, start=1): + _set_cell_string(items_sheet, 0, row_idx, item.get("part_number", "")) + _set_cell_string(items_sheet, 1, row_idx, item.get("description", "")) + _set_cell_string(items_sheet, 2, row_idx, item.get("item_type", "")) + _set_cell_string(items_sheet, 3, row_idx, item.get("sourcing_type", "")) + _set_cell_string(items_sheet, 4, row_idx, schema) + cost = item.get("standard_cost") + if cost is not None: + _set_cell_float(items_sheet, 5, row_idx, cost) + _set_cell_string(items_sheet, 6, row_idx, item.get("sourcing_link", "")) + _set_cell_string(items_sheet, 7, row_idx, item.get("long_description", "")) + + # Properties from latest revision (if available) + rev = None + try: + revisions = client.get_revisions(item.get("part_number", "")) + if revisions: + rev = revisions[0] + except RuntimeError: + pass + + prop_cols = [ + "manufacturer", + "manufacturer_pn", + "supplier", + "supplier_pn", + "lead_time_days", + "minimum_order_qty", + "lifecycle_status", + "rohs_compliant", + "country_of_origin", + "material", + "finish", + "notes", + ] + for pi, prop_key in enumerate(prop_cols): + val = _get_property(rev, prop_key) + if val: + _set_cell_string(items_sheet, 8 + pi, row_idx, val) + + _set_cell_string( + items_sheet, + 20, + row_idx, + item.get("created_at", "")[:10] if item.get("created_at") else "", + ) + _set_cell_string( + items_sheet, + 21, + row_idx, + item.get("updated_at", "")[:10] if item.get("updated_at") else "", + ) + + # Freeze header + _freeze_row(doc, 1) + + # -- BOM sheet (if we can find an assembly) ----------------------------- + assemblies = [i for i in items if i.get("item_type") == "assembly"] + if assemblies: + top_assembly = assemblies[0] + top_pn = top_assembly.get("part_number", "") + + if sheets.hasByName("BOM"): + bom_sheet = sheets.getByName("BOM") + else: + sheets.insertNewByName("BOM", 0) + bom_sheet = sheets.getByName("BOM") + + try: + pull_bom( + client, doc, bom_sheet, top_pn, project_code=project_code, schema=schema + ) + except RuntimeError: + pass # BOM sheet stays empty if fetch fails + + return len(items) diff --git a/pythonpath/silo_calc/push.py b/pythonpath/silo_calc/push.py new file mode 100644 index 0000000..fb1dc43 --- /dev/null +++ b/pythonpath/silo_calc/push.py @@ -0,0 +1,431 @@ +"""Push command -- sync local BOM edits back to the Silo database. + +Handles: +- Row classification (new / modified / synced / conflict) +- Creating new items via the API +- Updating existing items and BOM entry metadata +- Auto-tagging new items with the project code +- Conflict detection against server timestamps +- Updating row sync state after successful push +""" + +from typing import Any, Dict, List, Optional, Tuple + +from . import sheet_format as sf +from . import sync_engine +from .client import SiloClient + +# UNO imports +try: + import uno + + _HAS_UNO = True +except ImportError: + _HAS_UNO = False + + +def _read_sheet_rows(sheet) -> List[List[str]]: + """Read all rows from a sheet as lists of strings.""" + cursor = sheet.createCursor() + cursor.gotoStartOfUsedArea(False) + cursor.gotoEndOfUsedArea(True) + addr = cursor.getRangeAddress() + end_row = addr.EndRow + end_col = max(addr.EndColumn, sf.BOM_TOTAL_COLS - 1) + + rows = [] + for r in range(end_row + 1): + row_cells = [] + for c in range(end_col + 1): + cell = sheet.getCellByPosition(c, r) + # Get display string for all cell types + val = cell.getString() + row_cells.append(val) + # Pad to full width + while len(row_cells) < sf.BOM_TOTAL_COLS: + row_cells.append("") + rows.append(row_cells) + return rows + + +def _detect_project_code(doc) -> str: + """Try to detect the project code from the file path.""" + try: + file_url = doc.getURL() + if file_url: + file_path = uno.fileUrlToSystemPath(file_url) + parts = file_path.replace("\\", "/").split("/") + if "sheets" in parts: + idx = parts.index("sheets") + if idx + 1 < len(parts): + return parts[idx + 1] + except Exception: + pass + return "" + + +def _fetch_server_timestamps( + client: SiloClient, part_numbers: List[str] +) -> Dict[str, str]: + """Fetch updated_at timestamps for a list of part numbers.""" + timestamps = {} + for pn in part_numbers: + if not pn: + continue + try: + item = client.get_item(pn) + timestamps[pn] = item.get("updated_at", "") + except RuntimeError: + pass + return timestamps + + +# --------------------------------------------------------------------------- +# Push execution +# --------------------------------------------------------------------------- + + +def push_sheet( + client: SiloClient, + doc, + sheet, + schema: str = "kindred-rd", +) -> Dict[str, Any]: + """Execute a push for the active BOM sheet. + + Returns a summary dict with counts and any errors. + """ + if not _HAS_UNO: + raise RuntimeError("UNO API not available") + + rows = _read_sheet_rows(sheet) + if not rows: + return {"created": 0, "updated": 0, "errors": [], "skipped": 0} + + project_code = _detect_project_code(doc) + + # Classify all rows + classified = sync_engine.classify_rows(rows) + + # Collect part numbers for server timestamp check + modified_pns = [ + cells[sf.COL_PN].strip() + for _, status, cells in classified + if status == sync_engine.STATUS_MODIFIED and cells[sf.COL_PN].strip() + ] + server_ts = _fetch_server_timestamps(client, modified_pns) + + # Build diff + diff = sync_engine.build_push_diff(classified, server_timestamps=server_ts) + + results = { + "created": 0, + "updated": 0, + "errors": [], + "skipped": diff["unchanged"], + "conflicts": len(diff["conflicts"]), + } + + # -- Handle new rows: create items in the database ---------------------- + for row_info in diff["new"]: + row_idx = row_info["row_index"] + cells = rows[row_idx] + pn = cells[sf.COL_PN].strip() + desc = cells[sf.COL_DESCRIPTION].strip() + source = cells[sf.COL_SOURCE].strip() + sourcing_link = cells[sf.COL_SOURCING_LINK].strip() + unit_cost_str = cells[sf.COL_UNIT_COST].strip() + qty_str = cells[sf.COL_QTY].strip() + parent_pn = ( + cells[sf.COL_PARENT_PN].strip() if len(cells) > sf.COL_PARENT_PN else "" + ) + + unit_cost = None + if unit_cost_str: + try: + unit_cost = float(unit_cost_str.replace("$", "").replace(",", "")) + except ValueError: + pass + + qty = 1.0 + if qty_str: + try: + qty = float(qty_str) + except ValueError: + pass + + if not desc: + results["errors"].append( + f"Row {row_idx + 1}: description is required for new items" + ) + _set_row_status(sheet, row_idx, sync_engine.STATUS_ERROR) + continue + + try: + if pn: + # Check if item already exists + try: + existing = client.get_item(pn) + # Item exists -- just update BOM relationship if parent is known + if parent_pn: + _update_bom_relationship( + client, parent_pn, pn, qty, unit_cost, cells + ) + results["updated"] += 1 + _update_row_after_push(sheet, rows, row_idx, existing) + continue + except RuntimeError: + pass # Item doesn't exist, create it + + # Detect category from PN prefix (e.g., F01-0001 -> F01) + category = pn[:3] if pn and len(pn) >= 3 else "" + + # Create the item + create_data = { + "schema": schema, + "category": category, + "description": desc, + } + if source: + create_data["sourcing_type"] = source + if sourcing_link: + create_data["sourcing_link"] = sourcing_link + if unit_cost is not None: + create_data["standard_cost"] = unit_cost + if project_code: + create_data["projects"] = [project_code] + + created = client.create_item(**create_data) + created_pn = created.get("part_number", pn) + + # Update the PN cell if it was auto-generated + if not pn and created_pn: + from . import pull as _pull + + _pull._set_cell_string(sheet, sf.COL_PN, row_idx, created_pn) + cells[sf.COL_PN] = created_pn + + # Add to parent's BOM if parent is known + if parent_pn: + _update_bom_relationship( + client, parent_pn, created_pn, qty, unit_cost, cells + ) + + # Auto-tag with project + if project_code: + try: + client.add_item_projects(created_pn, [project_code]) + except RuntimeError: + pass + + # Set property columns via revision update (if any properties set) + _push_properties(client, created_pn, cells) + + results["created"] += 1 + _update_row_after_push(sheet, rows, row_idx, created) + + except RuntimeError as e: + results["errors"].append(f"Row {row_idx + 1} ({pn}): {e}") + _set_row_status(sheet, row_idx, sync_engine.STATUS_ERROR) + + # -- Handle modified rows: update items --------------------------------- + for row_info in diff["modified"]: + row_idx = row_info["row_index"] + cells = rows[row_idx] + pn = cells[sf.COL_PN].strip() + parent_pn = ( + cells[sf.COL_PARENT_PN].strip() if len(cells) > sf.COL_PARENT_PN else "" + ) + + if not pn: + results["errors"].append( + f"Row {row_idx + 1}: no part number for modified row" + ) + continue + + try: + # Update item fields + update_fields = {} + desc = cells[sf.COL_DESCRIPTION].strip() + if desc: + update_fields["description"] = desc + source = cells[sf.COL_SOURCE].strip() + if source: + update_fields["sourcing_type"] = source + sourcing_link = cells[sf.COL_SOURCING_LINK].strip() + update_fields["sourcing_link"] = sourcing_link + + unit_cost_str = cells[sf.COL_UNIT_COST].strip() + unit_cost = None + if unit_cost_str: + try: + unit_cost = float(unit_cost_str.replace("$", "").replace(",", "")) + update_fields["standard_cost"] = unit_cost + except ValueError: + pass + + if update_fields: + updated = client.update_item(pn, **update_fields) + else: + updated = client.get_item(pn) + + # Update BOM relationship + qty_str = cells[sf.COL_QTY].strip() + qty = 1.0 + if qty_str: + try: + qty = float(qty_str) + except ValueError: + pass + + if parent_pn: + _update_bom_relationship(client, parent_pn, pn, qty, unit_cost, cells) + + # Update properties + _push_properties(client, pn, cells) + + # Auto-tag with project + if project_code: + try: + existing_projects = client.get_item_projects(pn) + existing_codes = ( + {p.get("code", "") for p in existing_projects} + if isinstance(existing_projects, list) + else set() + ) + if project_code not in existing_codes: + client.add_item_projects(pn, [project_code]) + except RuntimeError: + pass + + results["updated"] += 1 + _update_row_after_push(sheet, rows, row_idx, updated) + + except RuntimeError as e: + results["errors"].append(f"Row {row_idx + 1} ({pn}): {e}") + _set_row_status(sheet, row_idx, sync_engine.STATUS_ERROR) + + # -- Mark conflicts ----------------------------------------------------- + for row_info in diff["conflicts"]: + row_idx = row_info["row_index"] + _set_row_status(sheet, row_idx, sync_engine.STATUS_CONFLICT) + + return results + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _update_bom_relationship( + client: SiloClient, + parent_pn: str, + child_pn: str, + qty: float, + unit_cost: Optional[float], + cells: List[str], +): + """Create or update a BOM relationship between parent and child.""" + metadata = {} + seller_desc = ( + cells[sf.COL_SELLER_DESC].strip() if len(cells) > sf.COL_SELLER_DESC else "" + ) + if seller_desc: + metadata["seller_description"] = seller_desc + if unit_cost is not None: + metadata["unit_cost"] = unit_cost + sourcing_link = ( + cells[sf.COL_SOURCING_LINK].strip() if len(cells) > sf.COL_SOURCING_LINK else "" + ) + if sourcing_link: + metadata["sourcing_link"] = sourcing_link + + try: + # Try update first (entry may already exist) + client.update_bom_entry( + parent_pn, + child_pn, + quantity=qty, + metadata=metadata if metadata else None, + ) + except RuntimeError: + # If update fails, try creating + try: + client.add_bom_entry( + parent_pn, + child_pn, + quantity=qty, + metadata=metadata if metadata else None, + ) + except RuntimeError: + pass # Best effort + + +def _push_properties(client: SiloClient, pn: str, cells: List[str]): + """Push property column values to the item's latest revision. + + Currently this is best-effort -- the API may not support bulk property + updates in a single call. Properties are stored in revision.properties + JSONB on the server side. + """ + # Collect property values from the row + properties = {} + for i, header in enumerate(sf.BOM_PROPERTY_HEADERS): + col_idx = sf.COL_PROP_START + i + if col_idx < len(cells): + val = cells[col_idx].strip() + if val: + db_key = sf.PROPERTY_KEY_MAP.get(header, "") + if db_key: + properties[db_key] = val + + if not properties: + return + + # The Silo API stores properties on revisions. For now, we'll update + # the item's long_description if it's set, and rely on the revision + # properties being set during create or via revision update. + long_desc = properties.pop("long_description", None) + if long_desc: + try: + client.update_item(pn, long_description=long_desc) + except RuntimeError: + pass + + +def _update_row_after_push( + sheet, rows: List[List[str]], row_idx: int, item: Dict[str, Any] +): + """Update sync tracking columns after a successful push.""" + from . import pull as _pull + + cells = rows[row_idx] + + # Update the PN if the server returned one (auto-generated) + server_pn = item.get("part_number", "") + if server_pn and not cells[sf.COL_PN].strip(): + cells[sf.COL_PN] = server_pn + _pull._set_cell_string(sheet, sf.COL_PN, row_idx, server_pn) + + # Recompute hash and set synced status + sync_engine.update_row_sync_state( + cells, + sync_engine.STATUS_SYNCED, + updated_at=item.get("updated_at", ""), + ) + _pull._set_cell_string(sheet, sf.COL_ROW_HASH, row_idx, cells[sf.COL_ROW_HASH]) + _pull._set_cell_string(sheet, sf.COL_ROW_STATUS, row_idx, cells[sf.COL_ROW_STATUS]) + _pull._set_cell_string(sheet, sf.COL_UPDATED_AT, row_idx, cells[sf.COL_UPDATED_AT]) + + _pull._set_row_bg(sheet, row_idx, sf.BOM_TOTAL_COLS, _pull._STATUS_COLORS["synced"]) + + +def _set_row_status(sheet, row_idx: int, status: str): + """Set just the status cell and row colour.""" + from . import pull as _pull + + _pull._set_cell_string(sheet, sf.COL_ROW_STATUS, row_idx, status) + color = _pull._STATUS_COLORS.get(status) + if color: + _pull._set_row_bg(sheet, row_idx, sf.BOM_TOTAL_COLS, color) diff --git a/pythonpath/silo_calc/settings.py b/pythonpath/silo_calc/settings.py new file mode 100644 index 0000000..ce8438f --- /dev/null +++ b/pythonpath/silo_calc/settings.py @@ -0,0 +1,94 @@ +"""Persistent settings for the Silo Calc extension. + +Settings are stored in ``~/.config/silo/calc-settings.json``. +The file is a flat JSON dict with known keys. +""" + +import json +import os +from pathlib import Path +from typing import Any, Dict + +_SETTINGS_DIR = ( + Path(os.environ.get("XDG_CONFIG_HOME", "~/.config")).expanduser() / "silo" +) +_SETTINGS_FILE = _SETTINGS_DIR / "calc-settings.json" + +# Default values for every known key. +_DEFAULTS: Dict[str, Any] = { + "api_url": "", + "api_token": "", + "ssl_verify": True, + "ssl_cert_path": "", + "auth_username": "", + "auth_role": "", + "auth_source": "", + "projects_dir": "", # fallback: SILO_PROJECTS_DIR env or ~/projects + "default_schema": "kindred-rd", + "openrouter_api_key": "", # fallback: OPENROUTER_API_KEY env var + "openrouter_model": "", # fallback: ai_client.DEFAULT_MODEL + "openrouter_instructions": "", # fallback: ai_client.DEFAULT_INSTRUCTIONS +} + + +def load() -> Dict[str, Any]: + """Load settings, returning defaults for any missing keys.""" + cfg = dict(_DEFAULTS) + if _SETTINGS_FILE.is_file(): + try: + with open(_SETTINGS_FILE, "r") as f: + stored = json.load(f) + cfg.update(stored) + except (json.JSONDecodeError, OSError): + pass + return cfg + + +def save(cfg: Dict[str, Any]) -> None: + """Persist the full settings dict to disk.""" + _SETTINGS_DIR.mkdir(parents=True, exist_ok=True) + with open(_SETTINGS_FILE, "w") as f: + json.dump(cfg, f, indent=2) + + +def get(key: str, default: Any = None) -> Any: + """Convenience: load a single key.""" + cfg = load() + return cfg.get(key, default) + + +def put(key: str, value: Any) -> None: + """Convenience: update a single key and persist.""" + cfg = load() + cfg[key] = value + save(cfg) + + +def save_auth(username: str, role: str = "", source: str = "", token: str = "") -> None: + """Store authentication info.""" + cfg = load() + cfg["auth_username"] = username + cfg["auth_role"] = role + cfg["auth_source"] = source + if token: + cfg["api_token"] = token + save(cfg) + + +def clear_auth() -> None: + """Remove stored auth credentials.""" + cfg = load() + cfg["api_token"] = "" + cfg["auth_username"] = "" + cfg["auth_role"] = "" + cfg["auth_source"] = "" + save(cfg) + + +def get_projects_dir() -> Path: + """Return the resolved projects base directory.""" + cfg = load() + d = cfg.get("projects_dir", "") + if not d: + d = os.environ.get("SILO_PROJECTS_DIR", "~/projects") + return Path(d).expanduser() diff --git a/pythonpath/silo_calc/sheet_format.py b/pythonpath/silo_calc/sheet_format.py new file mode 100644 index 0000000..370745b --- /dev/null +++ b/pythonpath/silo_calc/sheet_format.py @@ -0,0 +1,178 @@ +"""BOM and Items sheet column layouts, constants, and detection helpers. + +This module defines the column structure that matches the engineer's working +BOM format. Hidden property columns and sync-tracking columns are appended +to the right. +""" + +from typing import Dict, List, Optional, Tuple + +# --------------------------------------------------------------------------- +# Column indices -- BOM sheet +# --------------------------------------------------------------------------- + +# Visible core columns (always shown) +BOM_VISIBLE_HEADERS: List[str] = [ + "Item", # A - assembly label / section header + "Level", # B - depth in expanded BOM + "Source", # C - sourcing_type (M/P) + "PN", # D - part_number + "Description", # E - item description + "Seller Description", # F - metadata.seller_description + "Unit Cost", # G - standard_cost / metadata.unit_cost + "QTY", # H - quantity on relationship + "Ext Cost", # I - formula =G*H + "Sourcing Link", # J - sourcing_link + "Schema", # K - schema name +] + +# Hidden property columns (collapsed group, available when needed) +BOM_PROPERTY_HEADERS: List[str] = [ + "Manufacturer", # L + "Manufacturer PN", # M + "Supplier", # N + "Supplier PN", # O + "Lead Time (days)", # P + "Min Order Qty", # Q + "Lifecycle Status", # R + "RoHS Compliant", # S + "Country of Origin", # T + "Material", # U + "Finish", # V + "Notes", # W + "Long Description", # X +] + +# Hidden sync columns (never shown to user) +BOM_SYNC_HEADERS: List[str] = [ + "_silo_row_hash", # Y - SHA256 of row data at pull time + "_silo_row_status", # Z - synced/modified/new/error + "_silo_updated_at", # AA - server timestamp + "_silo_parent_pn", # AB - parent assembly PN for this BOM entry +] + +# All headers in order +BOM_ALL_HEADERS: List[str] = ( + BOM_VISIBLE_HEADERS + BOM_PROPERTY_HEADERS + BOM_SYNC_HEADERS +) + +# Index constants for quick access +COL_ITEM = 0 +COL_LEVEL = 1 +COL_SOURCE = 2 +COL_PN = 3 +COL_DESCRIPTION = 4 +COL_SELLER_DESC = 5 +COL_UNIT_COST = 6 +COL_QTY = 7 +COL_EXT_COST = 8 +COL_SOURCING_LINK = 9 +COL_SCHEMA = 10 + +# Property column range +COL_PROP_START = len(BOM_VISIBLE_HEADERS) # 11 +COL_PROP_END = COL_PROP_START + len(BOM_PROPERTY_HEADERS) # 24 + +# Sync column range +COL_SYNC_START = COL_PROP_END # 24 +COL_ROW_HASH = COL_SYNC_START # 24 +COL_ROW_STATUS = COL_SYNC_START + 1 # 25 +COL_UPDATED_AT = COL_SYNC_START + 2 # 26 +COL_PARENT_PN = COL_SYNC_START + 3 # 27 + +# Total column count +BOM_TOTAL_COLS = len(BOM_ALL_HEADERS) + +# --------------------------------------------------------------------------- +# Items sheet columns (flat list of all items for a project) +# --------------------------------------------------------------------------- + +ITEMS_HEADERS: List[str] = [ + "PN", + "Description", + "Type", + "Source", + "Schema", + "Standard Cost", + "Sourcing Link", + "Long Description", + "Manufacturer", + "Manufacturer PN", + "Supplier", + "Supplier PN", + "Lead Time (days)", + "Min Order Qty", + "Lifecycle Status", + "RoHS Compliant", + "Country of Origin", + "Material", + "Finish", + "Notes", + "Created", + "Updated", +] + +# --------------------------------------------------------------------------- +# Property key mapping (header name -> DB field path) +# --------------------------------------------------------------------------- + +PROPERTY_KEY_MAP: Dict[str, str] = { + "Manufacturer": "manufacturer", + "Manufacturer PN": "manufacturer_pn", + "Supplier": "supplier", + "Supplier PN": "supplier_pn", + "Lead Time (days)": "lead_time_days", + "Min Order Qty": "minimum_order_qty", + "Lifecycle Status": "lifecycle_status", + "RoHS Compliant": "rohs_compliant", + "Country of Origin": "country_of_origin", + "Material": "material", + "Finish": "finish", + "Notes": "notes", +} + +# Reverse map +DB_FIELD_TO_HEADER: Dict[str, str] = {v: k for k, v in PROPERTY_KEY_MAP.items()} + +# --------------------------------------------------------------------------- +# Row status colours (RGB tuples, 0-255) +# --------------------------------------------------------------------------- + +STATUS_COLORS: Dict[str, Tuple[int, int, int]] = { + "synced": (198, 239, 206), # light green #C6EFCE + "modified": (255, 235, 156), # light yellow #FFEB9C + "new": (189, 215, 238), # light blue #BDD7EE + "error": (255, 199, 206), # light red #FFC7CE + "conflict": (244, 176, 132), # orange #F4B084 +} + +# --------------------------------------------------------------------------- +# Sheet type detection +# --------------------------------------------------------------------------- + + +def detect_sheet_type(headers: List[str]) -> Optional[str]: + """Detect sheet type from the first row of headers. + + Returns ``"bom"``, ``"items"``, or ``None`` if unrecognised. + """ + if not headers: + return None + # Normalise for comparison + norm = [h.strip().lower() for h in headers] + if "item" in norm and "level" in norm and "qty" in norm: + return "bom" + if "pn" in norm and "type" in norm: + return "items" + return None + + +def col_letter(index: int) -> str: + """Convert 0-based column index to spreadsheet letter (A, B, ..., AA, AB).""" + result = "" + while True: + result = chr(65 + index % 26) + result + index = index // 26 - 1 + if index < 0: + break + return result diff --git a/pythonpath/silo_calc/sync_engine.py b/pythonpath/silo_calc/sync_engine.py new file mode 100644 index 0000000..e00a360 --- /dev/null +++ b/pythonpath/silo_calc/sync_engine.py @@ -0,0 +1,160 @@ +"""Row hashing, diff classification, and sync state tracking. + +Used by push/pull commands to detect which rows have been modified locally +since the last pull, and to detect conflicts with server-side changes. +""" + +import hashlib +import json +from typing import Any, Dict, List, Optional, Tuple + +from . import sheet_format as sf + +# Row statuses +STATUS_SYNCED = "synced" +STATUS_MODIFIED = "modified" +STATUS_NEW = "new" +STATUS_ERROR = "error" +STATUS_CONFLICT = "conflict" + + +def compute_row_hash(cells: List[str]) -> str: + """SHA-256 hash of the visible + property columns of a row. + + Only the data columns are hashed (not sync tracking columns). + Blank/empty cells are normalised to the empty string. + """ + # Use columns 0..COL_PROP_END-1 (visible + properties, not sync cols) + data_cells = cells[: sf.COL_PROP_END] + # Normalise + normalised = [str(c).strip() if c else "" for c in data_cells] + raw = "\t".join(normalised).encode("utf-8") + return hashlib.sha256(raw).hexdigest() + + +def classify_row(cells: List[str]) -> str: + """Return the sync status of a single row. + + Reads the stored hash and current cell values to determine whether + the row is synced, modified, new, or in an error state. + """ + # Ensure we have enough columns + while len(cells) < sf.BOM_TOTAL_COLS: + cells.append("") + + stored_hash = cells[sf.COL_ROW_HASH].strip() if cells[sf.COL_ROW_HASH] else "" + stored_status = cells[sf.COL_ROW_STATUS].strip() if cells[sf.COL_ROW_STATUS] else "" + + # No hash -> new row (never pulled from server) + if not stored_hash: + # Check if there's any data in the row + has_data = any( + cells[i].strip() + for i in range(sf.COL_PROP_END) + if i < len(cells) and cells[i] + ) + return STATUS_NEW if has_data else "" + + # Compute current hash and compare + current_hash = compute_row_hash(cells) + if current_hash == stored_hash: + return STATUS_SYNCED + return STATUS_MODIFIED + + +def classify_rows(all_rows: List[List[str]]) -> List[Tuple[int, str, List[str]]]: + """Classify every row in a sheet. + + Returns list of ``(row_index, status, cells)`` for rows that have data. + Blank separator rows and the header row (index 0) are skipped. + """ + results = [] + for i, cells in enumerate(all_rows): + if i == 0: + continue # header row + status = classify_row(list(cells)) + if status: + results.append((i, status, list(cells))) + return results + + +def build_push_diff( + classified: List[Tuple[int, str, List[str]]], + server_timestamps: Optional[Dict[str, str]] = None, +) -> Dict[str, List[Dict[str, Any]]]: + """Build a push diff from classified rows. + + *server_timestamps* maps part numbers to their server ``updated_at`` + values, used for conflict detection. + + Returns a dict with keys ``new``, ``modified``, ``conflicts``, and + the count of ``unchanged`` rows. + """ + server_ts = server_timestamps or {} + new_rows = [] + modified_rows = [] + conflicts = [] + unchanged = 0 + + for row_idx, status, cells in classified: + if status == STATUS_SYNCED: + unchanged += 1 + continue + + pn = cells[sf.COL_PN].strip() if len(cells) > sf.COL_PN else "" + stored_ts = ( + cells[sf.COL_UPDATED_AT].strip() + if len(cells) > sf.COL_UPDATED_AT and cells[sf.COL_UPDATED_AT] + else "" + ) + + row_info = { + "row_index": row_idx, + "part_number": pn, + "description": cells[sf.COL_DESCRIPTION].strip() + if len(cells) > sf.COL_DESCRIPTION + else "", + "cells": cells[: sf.COL_PROP_END], + } + + if status == STATUS_NEW: + new_rows.append(row_info) + elif status == STATUS_MODIFIED: + # Check for conflict: server changed since we pulled + server_updated = server_ts.get(pn, "") + if stored_ts and server_updated and server_updated != stored_ts: + row_info["local_ts"] = stored_ts + row_info["server_ts"] = server_updated + conflicts.append(row_info) + else: + modified_rows.append(row_info) + + return { + "new": new_rows, + "modified": modified_rows, + "conflicts": conflicts, + "unchanged": unchanged, + } + + +def update_row_sync_state( + cells: List[str], + status: str, + updated_at: str = "", + parent_pn: str = "", +) -> List[str]: + """Set the sync tracking columns on a row and return it. + + Recomputes the row hash from current visible+property data. + """ + while len(cells) < sf.BOM_TOTAL_COLS: + cells.append("") + + cells[sf.COL_ROW_HASH] = compute_row_hash(cells) + cells[sf.COL_ROW_STATUS] = status + if updated_at: + cells[sf.COL_UPDATED_AT] = updated_at + if parent_pn: + cells[sf.COL_PARENT_PN] = parent_pn + + return cells diff --git a/silo-client b/silo-client new file mode 160000 index 0000000..a6ac3d4 --- /dev/null +++ b/silo-client @@ -0,0 +1 @@ +Subproject commit a6ac3d4d0664d160c49e145dead5e790e8e723f3 diff --git a/silo_calc_component.py b/silo_calc_component.py new file mode 100644 index 0000000..330498d --- /dev/null +++ b/silo_calc_component.py @@ -0,0 +1,501 @@ +"""UNO ProtocolHandler component for the Silo Calc extension. + +This file is registered in META-INF/manifest.xml and acts as the entry +point for all toolbar / menu commands. Each custom protocol URL +dispatches to a handler function that orchestrates the corresponding +feature. + +All silo_calc submodule imports are deferred to handler call time so +that the component registration always succeeds even if a submodule +has issues. +""" + +import os +import sys +import traceback + +import uno +import unohelper +from com.sun.star.frame import XDispatch, XDispatchProvider +from com.sun.star.lang import XInitialization, XServiceInfo + +# Ensure pythonpath/ is importable +_ext_dir = os.path.dirname(os.path.abspath(__file__)) +_pypath = os.path.join(_ext_dir, "pythonpath") +if _pypath not in sys.path: + sys.path.insert(0, _pypath) + +# Ensure silo-client package is importable +_client_path = os.path.join(_ext_dir, "silo-client") +if _client_path not in sys.path: + sys.path.insert(0, _client_path) + +# Service identifiers +_IMPL_NAME = "io.kindredsystems.silo.calc.Component" +_SERVICE_NAME = "com.sun.star.frame.ProtocolHandler" +_PROTOCOL = "io.kindredsystems.silo.calc:" + + +def _log(msg: str): + """Print to the LibreOffice terminal / stderr.""" + print(f"[Silo Calc] {msg}") + + +def _get_desktop(): + ctx = uno.getComponentContext() + smgr = ctx.ServiceManager + return smgr.createInstanceWithContext("com.sun.star.frame.Desktop", ctx) + + +def _get_active_sheet(): + """Return (doc, sheet) for the current spreadsheet, or (None, None).""" + desktop = _get_desktop() + doc = desktop.getCurrentComponent() + if doc is None: + return None, None + if not doc.supportsService("com.sun.star.sheet.SpreadsheetDocument"): + return None, None + sheet = doc.getSheets().getByIndex( + doc.getCurrentController().getActiveSheet().getRangeAddress().Sheet + ) + return doc, sheet + + +def _msgbox(title, message, box_type="infobox"): + """Lightweight message box that doesn't depend on dialogs module.""" + ctx = uno.getComponentContext() + smgr = ctx.ServiceManager + toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx) + parent = _get_desktop().getCurrentFrame().getContainerWindow() + mbt = uno.Enum( + "com.sun.star.awt.MessageBoxType", + "INFOBOX" if box_type == "infobox" else "ERRORBOX", + ) + box = toolkit.createMessageBox(parent, mbt, 1, title, message) + box.execute() + + +# --------------------------------------------------------------------------- +# Command handlers -- imports are deferred to call time +# --------------------------------------------------------------------------- + + +def _cmd_login(frame): + from silo_calc import dialogs + + dialogs.show_login_dialog() + + +def _cmd_settings(frame): + from silo_calc import dialogs + + dialogs.show_settings_dialog() + + +def _cmd_pull_bom(frame): + """Pull a BOM from the server and populate the active sheet.""" + from silo_calc import dialogs, project_files + from silo_calc import pull as _pull + from silo_calc import settings as _settings + from silo_calc.client import SiloClient + + client = SiloClient() + if not client.is_authenticated(): + dialogs.show_login_dialog() + client = SiloClient() # reload after login + if not client.is_authenticated(): + return + + pn = dialogs.show_assembly_picker(client) + if not pn: + return + + project_code = ( + dialogs._input_box("Pull BOM", "Project code for auto-tagging (optional):") + or "" + ) + + doc, sheet = _get_active_sheet() + if doc is None: + desktop = _get_desktop() + doc = desktop.loadComponentFromURL("private:factory/scalc", "_blank", 0, ()) + sheet = doc.getSheets().getByIndex(0) + sheet.setName("BOM") + + try: + count = _pull.pull_bom( + client, + doc, + sheet, + pn, + project_code=project_code.strip(), + schema=_settings.get("default_schema", "kindred-rd"), + ) + _log(f"Pulled BOM for {pn}: {count} rows") + + if project_code.strip(): + path = project_files.get_project_sheet_path(project_code.strip()) + project_files.ensure_project_dir(project_code.strip()) + url = uno.systemPathToFileUrl(str(path)) + doc.storeToURL(url, ()) + _log(f"Saved to {path}") + + _msgbox("Pull BOM", f"Pulled {count} rows for {pn}.") + except RuntimeError as e: + _msgbox("Pull BOM Failed", str(e), box_type="errorbox") + + +def _cmd_pull_project(frame): + """Pull all project items as a multi-sheet workbook.""" + from silo_calc import dialogs, project_files + from silo_calc import pull as _pull + from silo_calc import settings as _settings + from silo_calc.client import SiloClient + + client = SiloClient() + if not client.is_authenticated(): + dialogs.show_login_dialog() + client = SiloClient() + if not client.is_authenticated(): + return + + code = dialogs.show_project_picker(client) + if not code: + return + + doc, _ = _get_active_sheet() + if doc is None: + desktop = _get_desktop() + doc = desktop.loadComponentFromURL("private:factory/scalc", "_blank", 0, ()) + + try: + count = _pull.pull_project( + client, + doc, + code.strip(), + schema=_settings.get("default_schema", "kindred-rd"), + ) + _log(f"Pulled project {code}: {count} items") + + path = project_files.get_project_sheet_path(code.strip()) + project_files.ensure_project_dir(code.strip()) + url = uno.systemPathToFileUrl(str(path)) + doc.storeToURL(url, ()) + _log(f"Saved to {path}") + + _msgbox("Pull Project", f"Pulled {count} items for project {code}.") + except RuntimeError as e: + _msgbox("Pull Project Failed", str(e), box_type="errorbox") + + +def _cmd_push(frame): + """Push local changes back to the server.""" + from silo_calc import dialogs, sync_engine + from silo_calc import push as _push + from silo_calc import settings as _settings + from silo_calc import sheet_format as sf + from silo_calc.client import SiloClient + + client = SiloClient() + if not client.is_authenticated(): + dialogs.show_login_dialog() + client = SiloClient() + if not client.is_authenticated(): + return + + doc, sheet = _get_active_sheet() + if doc is None or sheet is None: + _msgbox("Push", "No active spreadsheet.", box_type="errorbox") + return + + rows = _push._read_sheet_rows(sheet) + classified = sync_engine.classify_rows(rows) + + modified_pns = [ + cells[sf.COL_PN].strip() + for _, status, cells in classified + if status == sync_engine.STATUS_MODIFIED and cells[sf.COL_PN].strip() + ] + server_ts = _push._fetch_server_timestamps(client, modified_pns) + diff = sync_engine.build_push_diff(classified, server_timestamps=server_ts) + + ok = dialogs.show_push_summary( + new_count=len(diff["new"]), + modified_count=len(diff["modified"]), + conflict_count=len(diff["conflicts"]), + unchanged_count=diff["unchanged"], + ) + if not ok: + return + + try: + results = _push.push_sheet( + client, + doc, + sheet, + schema=_settings.get("default_schema", "kindred-rd"), + ) + except RuntimeError as e: + _msgbox("Push Failed", str(e), box_type="errorbox") + return + + try: + file_url = doc.getURL() + if file_url: + doc.store() + except Exception: + pass + + summary_lines = [ + f"Created: {results['created']}", + f"Updated: {results['updated']}", + f"Conflicts: {results.get('conflicts', 0)}", + f"Skipped: {results['skipped']}", + ] + if results["errors"]: + summary_lines.append(f"\nErrors ({len(results['errors'])}):") + for err in results["errors"][:10]: + summary_lines.append(f" - {err}") + if len(results["errors"]) > 10: + summary_lines.append(f" ... and {len(results['errors']) - 10} more") + + _msgbox("Push Complete", "\n".join(summary_lines)) + _log(f"Push complete: {results['created']} created, {results['updated']} updated") + + +def _cmd_add_item(frame): + """Completion wizard for adding a new BOM row.""" + from silo_calc import completion_wizard as _wizard + from silo_calc import settings as _settings + from silo_calc.client import SiloClient + + client = SiloClient() + if not client.is_authenticated(): + from silo_calc import dialogs + + dialogs.show_login_dialog() + client = SiloClient() + if not client.is_authenticated(): + return + + doc, sheet = _get_active_sheet() + if doc is None or sheet is None: + _msgbox("Add Item", "No active spreadsheet.", box_type="errorbox") + return + + project_code = "" + try: + file_url = doc.getURL() + if file_url: + file_path = uno.fileUrlToSystemPath(file_url) + parts = file_path.replace("\\", "/").split("/") + if "sheets" in parts: + idx = parts.index("sheets") + if idx + 1 < len(parts): + project_code = parts[idx + 1] + except Exception: + pass + + cursor = sheet.createCursor() + cursor.gotoStartOfUsedArea(False) + cursor.gotoEndOfUsedArea(True) + insert_row = cursor.getRangeAddress().EndRow + 1 + + ok = _wizard.run_completion_wizard( + client, + doc, + sheet, + insert_row, + project_code=project_code, + schema=_settings.get("default_schema", "kindred-rd"), + ) + if ok: + _log(f"Added new item at row {insert_row + 1}") + + +def _cmd_refresh(frame): + """Re-pull the current sheet from server.""" + _msgbox("Refresh", "Refresh -- coming soon.") + + +def _cmd_ai_description(frame): + """Generate an AI description from the seller description in the current row.""" + from silo_calc import ai_client as _ai + from silo_calc import dialogs + from silo_calc import pull as _pull + from silo_calc import sheet_format as sf + + if not _ai.is_configured(): + _msgbox( + "AI Describe", + "OpenRouter API key not configured.\n\n" + "Set it in Silo Settings or via the OPENROUTER_API_KEY environment variable.", + box_type="errorbox", + ) + return + + doc, sheet = _get_active_sheet() + if doc is None or sheet is None: + _msgbox("AI Describe", "No active spreadsheet.", box_type="errorbox") + return + + controller = doc.getCurrentController() + selection = controller.getSelection() + try: + cell_addr = selection.getCellAddress() + row = cell_addr.Row + except AttributeError: + try: + range_addr = selection.getRangeAddress() + row = range_addr.StartRow + except AttributeError: + _msgbox("AI Describe", "Select a cell in a BOM row.", box_type="errorbox") + return + + if row == 0: + _msgbox( + "AI Describe", "Select a data row, not the header.", box_type="errorbox" + ) + return + + seller_desc = sheet.getCellByPosition(sf.COL_SELLER_DESC, row).getString().strip() + if not seller_desc: + _msgbox( + "AI Describe", + f"No seller description in column F (row {row + 1}).", + box_type="errorbox", + ) + return + + existing_desc = sheet.getCellByPosition(sf.COL_DESCRIPTION, row).getString().strip() + part_number = sheet.getCellByPosition(sf.COL_PN, row).getString().strip() + category = part_number[:3] if len(part_number) >= 3 else "" + + while True: + try: + ai_desc = _ai.generate_description( + seller_description=seller_desc, + category=category, + existing_description=existing_desc, + part_number=part_number, + ) + except RuntimeError as e: + _msgbox("AI Describe Failed", str(e), box_type="errorbox") + return + + accepted = dialogs.show_ai_description_dialog(seller_desc, ai_desc) + if accepted is not None: + _pull._set_cell_string(sheet, sf.COL_DESCRIPTION, row, accepted) + _log(f"AI description written to row {row + 1}: {accepted}") + return + + retry = dialogs._input_box( + "AI Describe", + "Generate again? (yes/no):", + default="no", + ) + if not retry or retry.strip().lower() not in ("yes", "y"): + return + + +# Command dispatch table +_COMMANDS = { + "SiloLogin": _cmd_login, + "SiloPullBOM": _cmd_pull_bom, + "SiloPullProject": _cmd_pull_project, + "SiloPush": _cmd_push, + "SiloAddItem": _cmd_add_item, + "SiloRefresh": _cmd_refresh, + "SiloSettings": _cmd_settings, + "SiloAIDescription": _cmd_ai_description, +} + + +# --------------------------------------------------------------------------- +# UNO Dispatch implementation +# --------------------------------------------------------------------------- + + +class SiloDispatch(unohelper.Base, XDispatch): + """Handles a single dispatched command.""" + + def __init__(self, command: str, frame): + self._command = command + self._frame = frame + self._listeners = [] + + def dispatch(self, url, args): + handler = _COMMANDS.get(self._command) + if handler: + try: + handler(self._frame) + except Exception: + _log(f"Error in {self._command}:\n{traceback.format_exc()}") + try: + _msgbox( + f"Silo Error: {self._command}", + traceback.format_exc(), + box_type="errorbox", + ) + except Exception: + pass + + def addStatusListener(self, listener, url): + self._listeners.append(listener) + + def removeStatusListener(self, listener, url): + if listener in self._listeners: + self._listeners.remove(listener) + + +class SiloDispatchProvider( + unohelper.Base, XDispatchProvider, XInitialization, XServiceInfo +): + """ProtocolHandler component for Silo commands. + + LibreOffice instantiates this via com.sun.star.frame.ProtocolHandler + and calls initialize() with the frame, then queryDispatch() for each + command URL matching our protocol. + """ + + def __init__(self, ctx): + self._ctx = ctx + self._frame = None + + # XInitialization -- called by framework with the Frame + def initialize(self, args): + if args: + self._frame = args[0] + + # XDispatchProvider + def queryDispatch(self, url, target_frame_name, search_flags): + if url.Protocol == _PROTOCOL: + command = url.Path + if command in _COMMANDS: + return SiloDispatch(command, self._frame) + return None + + def queryDispatches(self, requests): + return [ + self.queryDispatch(r.FeatureURL, r.FrameName, r.SearchFlags) + for r in requests + ] + + # XServiceInfo + def getImplementationName(self): + return _IMPL_NAME + + def supportsService(self, name): + return name == _SERVICE_NAME + + def getSupportedServiceNames(self): + return (_SERVICE_NAME,) + + +# UNO component registration +g_ImplementationHelper = unohelper.ImplementationHelper() +g_ImplementationHelper.addImplementation( + SiloDispatchProvider, + _IMPL_NAME, + (_SERVICE_NAME,), +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_basics.py b/tests/test_basics.py new file mode 100644 index 0000000..b6aeac1 --- /dev/null +++ b/tests/test_basics.py @@ -0,0 +1,345 @@ +"""Basic tests for silo_calc modules (no UNO dependency).""" + +import hashlib +import json +import os +import sys +import tempfile +import unittest +from pathlib import Path + +# Add pythonpath to sys.path so we can import without LibreOffice +_pkg_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +_pypath = os.path.join(_pkg_dir, "pythonpath") +if _pypath not in sys.path: + sys.path.insert(0, _pypath) + +from silo_calc import project_files, sync_engine +from silo_calc import settings as _settings +from silo_calc import sheet_format as sf + + +class TestSheetFormat(unittest.TestCase): + def test_bom_header_counts(self): + self.assertEqual(len(sf.BOM_VISIBLE_HEADERS), 11) + self.assertEqual(len(sf.BOM_PROPERTY_HEADERS), 13) + self.assertEqual(len(sf.BOM_SYNC_HEADERS), 4) + self.assertEqual(sf.BOM_TOTAL_COLS, 28) + + def test_column_indices(self): + self.assertEqual(sf.COL_ITEM, 0) + self.assertEqual(sf.COL_PN, 3) + self.assertEqual(sf.COL_UNIT_COST, 6) + self.assertEqual(sf.COL_QTY, 7) + self.assertEqual(sf.COL_EXT_COST, 8) + + def test_detect_sheet_type_bom(self): + headers = [ + "Item", + "Level", + "Source", + "PN", + "Description", + "Seller Description", + "Unit Cost", + "QTY", + "Ext Cost", + ] + self.assertEqual(sf.detect_sheet_type(headers), "bom") + + def test_detect_sheet_type_items(self): + headers = ["PN", "Description", "Type", "Source"] + self.assertEqual(sf.detect_sheet_type(headers), "items") + + def test_detect_sheet_type_unknown(self): + self.assertIsNone(sf.detect_sheet_type([])) + self.assertIsNone(sf.detect_sheet_type(["Foo", "Bar"])) + + def test_col_letter(self): + self.assertEqual(sf.col_letter(0), "A") + self.assertEqual(sf.col_letter(25), "Z") + self.assertEqual(sf.col_letter(26), "AA") + self.assertEqual(sf.col_letter(27), "AB") + + def test_property_key_map_bidirectional(self): + for header, key in sf.PROPERTY_KEY_MAP.items(): + self.assertEqual(sf.DB_FIELD_TO_HEADER[key], header) + + +class TestSyncEngine(unittest.TestCase): + def _make_row(self, pn="F01-0001", desc="Test", cost="10.00", qty="2"): + """Create a minimal BOM row with enough columns.""" + cells = [""] * sf.BOM_TOTAL_COLS + cells[sf.COL_PN] = pn + cells[sf.COL_DESCRIPTION] = desc + cells[sf.COL_UNIT_COST] = cost + cells[sf.COL_QTY] = qty + return cells + + def test_compute_row_hash_deterministic(self): + row = self._make_row() + h1 = sync_engine.compute_row_hash(row) + h2 = sync_engine.compute_row_hash(row) + self.assertEqual(h1, h2) + self.assertEqual(len(h1), 64) # SHA-256 hex + + def test_compute_row_hash_changes(self): + row1 = self._make_row(cost="10.00") + row2 = self._make_row(cost="20.00") + self.assertNotEqual( + sync_engine.compute_row_hash(row1), + sync_engine.compute_row_hash(row2), + ) + + def test_classify_row_new(self): + row = self._make_row() + # No stored hash -> new + self.assertEqual(sync_engine.classify_row(row), sync_engine.STATUS_NEW) + + def test_classify_row_synced(self): + row = self._make_row() + # Set stored hash to current hash + row[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row) + row[sf.COL_ROW_STATUS] = "synced" + self.assertEqual(sync_engine.classify_row(row), sync_engine.STATUS_SYNCED) + + def test_classify_row_modified(self): + row = self._make_row() + row[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row) + # Now change a cell + row[sf.COL_UNIT_COST] = "99.99" + self.assertEqual(sync_engine.classify_row(row), sync_engine.STATUS_MODIFIED) + + def test_classify_rows_skips_header(self): + header = list(sf.BOM_ALL_HEADERS) + row1 = self._make_row() + all_rows = [header, row1] + classified = sync_engine.classify_rows(all_rows) + # Header row (index 0) should be skipped + self.assertEqual(len(classified), 1) + self.assertEqual(classified[0][0], 1) # row index + + def test_update_row_sync_state(self): + row = self._make_row() + updated = sync_engine.update_row_sync_state( + row, "synced", updated_at="2025-01-01T00:00:00Z", parent_pn="A01-0003" + ) + self.assertEqual(updated[sf.COL_ROW_STATUS], "synced") + self.assertEqual(updated[sf.COL_UPDATED_AT], "2025-01-01T00:00:00Z") + self.assertEqual(updated[sf.COL_PARENT_PN], "A01-0003") + # Hash should be set + self.assertEqual(len(updated[sf.COL_ROW_HASH]), 64) + + def test_build_push_diff(self): + row_new = self._make_row(pn="NEW-0001") + row_synced = self._make_row(pn="F01-0001") + row_synced[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row_synced) + + row_modified = self._make_row(pn="F01-0002") + row_modified[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row_modified) + row_modified[sf.COL_UNIT_COST] = "999.99" # change after hash + + classified = [ + (1, sync_engine.STATUS_NEW, row_new), + (2, sync_engine.STATUS_SYNCED, row_synced), + (3, sync_engine.STATUS_MODIFIED, row_modified), + ] + diff = sync_engine.build_push_diff(classified) + self.assertEqual(len(diff["new"]), 1) + self.assertEqual(len(diff["modified"]), 1) + self.assertEqual(diff["unchanged"], 1) + self.assertEqual(len(diff["conflicts"]), 0) + + def test_conflict_detection(self): + row = self._make_row(pn="F01-0001") + row[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row) + row[sf.COL_UPDATED_AT] = "2025-01-01T00:00:00Z" + row[sf.COL_UNIT_COST] = "changed" # local modification + + classified = [(1, sync_engine.STATUS_MODIFIED, row)] + diff = sync_engine.build_push_diff( + classified, + server_timestamps={ + "F01-0001": "2025-06-01T00:00:00Z" + }, # server changed too + ) + self.assertEqual(len(diff["conflicts"]), 1) + self.assertEqual(len(diff["modified"]), 0) + + +class TestSettings(unittest.TestCase): + def test_load_defaults(self): + # Use a temp dir so we don't touch real settings + with tempfile.TemporaryDirectory() as tmp: + _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" + cfg = _settings.load() + self.assertEqual(cfg["ssl_verify"], True) + self.assertEqual(cfg["default_schema"], "kindred-rd") + + def test_save_and_load(self): + with tempfile.TemporaryDirectory() as tmp: + _settings._SETTINGS_DIR = Path(tmp) + _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" + _settings.put("api_url", "https://silo.test/api") + cfg = _settings.load() + self.assertEqual(cfg["api_url"], "https://silo.test/api") + + def test_save_auth_and_clear(self): + with tempfile.TemporaryDirectory() as tmp: + _settings._SETTINGS_DIR = Path(tmp) + _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" + _settings.save_auth("testuser", "editor", "local", "silo_abc123") + cfg = _settings.load() + self.assertEqual(cfg["auth_username"], "testuser") + self.assertEqual(cfg["api_token"], "silo_abc123") + + _settings.clear_auth() + cfg = _settings.load() + self.assertEqual(cfg["api_token"], "") + self.assertEqual(cfg["auth_username"], "") + + +class TestProjectFiles(unittest.TestCase): + def test_get_project_sheet_path(self): + path = project_files.get_project_sheet_path("3DX10") + self.assertTrue(str(path).endswith("sheets/3DX10/3DX10.ods")) + + def test_save_and_read(self): + with tempfile.TemporaryDirectory() as tmp: + _settings._SETTINGS_DIR = Path(tmp) + _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" + _settings.put("projects_dir", tmp) + + test_data = b"PK\x03\x04fake-ods-content" + path = project_files.save_project_sheet("TEST", test_data) + self.assertTrue(path.is_file()) + self.assertEqual(path.name, "TEST.ods") + + read_back = project_files.read_project_sheet("TEST") + self.assertEqual(read_back, test_data) + + def test_list_project_sheets(self): + with tempfile.TemporaryDirectory() as tmp: + _settings._SETTINGS_DIR = Path(tmp) + _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" + _settings.put("projects_dir", tmp) + + # Create two project dirs + for code in ("AAA", "BBB"): + d = Path(tmp) / "sheets" / code + d.mkdir(parents=True) + (d / f"{code}.ods").write_bytes(b"fake") + + sheets = project_files.list_project_sheets() + codes = [s[0] for s in sheets] + self.assertIn("AAA", codes) + self.assertIn("BBB", codes) + + +class TestAIClient(unittest.TestCase): + """Test ai_client helpers that don't require network or UNO.""" + + def test_default_constants(self): + from silo_calc import ai_client + + self.assertTrue(ai_client.OPENROUTER_API_URL.startswith("https://")) + self.assertTrue(len(ai_client.DEFAULT_MODEL) > 0) + self.assertTrue(len(ai_client.DEFAULT_INSTRUCTIONS) > 0) + + def test_is_configured_false_by_default(self): + from silo_calc import ai_client + + with tempfile.TemporaryDirectory() as tmp: + _settings._SETTINGS_DIR = Path(tmp) + _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" + old = os.environ.pop("OPENROUTER_API_KEY", None) + try: + self.assertFalse(ai_client.is_configured()) + finally: + if old is not None: + os.environ["OPENROUTER_API_KEY"] = old + + def test_is_configured_with_env_var(self): + from silo_calc import ai_client + + with tempfile.TemporaryDirectory() as tmp: + _settings._SETTINGS_DIR = Path(tmp) + _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" + old = os.environ.get("OPENROUTER_API_KEY") + os.environ["OPENROUTER_API_KEY"] = "sk-test-key" + try: + self.assertTrue(ai_client.is_configured()) + finally: + if old is not None: + os.environ["OPENROUTER_API_KEY"] = old + else: + os.environ.pop("OPENROUTER_API_KEY", None) + + def test_is_configured_with_settings(self): + from silo_calc import ai_client + + with tempfile.TemporaryDirectory() as tmp: + _settings._SETTINGS_DIR = Path(tmp) + _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" + _settings.put("openrouter_api_key", "sk-test-key") + old = os.environ.pop("OPENROUTER_API_KEY", None) + try: + self.assertTrue(ai_client.is_configured()) + finally: + if old is not None: + os.environ["OPENROUTER_API_KEY"] = old + + def test_chat_completion_missing_key_raises(self): + from silo_calc import ai_client + + with tempfile.TemporaryDirectory() as tmp: + _settings._SETTINGS_DIR = Path(tmp) + _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" + old = os.environ.pop("OPENROUTER_API_KEY", None) + try: + with self.assertRaises(RuntimeError) as ctx: + ai_client.chat_completion([{"role": "user", "content": "test"}]) + self.assertIn("not configured", str(ctx.exception)) + finally: + if old is not None: + os.environ["OPENROUTER_API_KEY"] = old + + def test_get_model_default(self): + from silo_calc import ai_client + + with tempfile.TemporaryDirectory() as tmp: + _settings._SETTINGS_DIR = Path(tmp) + _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" + self.assertEqual(ai_client._get_model(), ai_client.DEFAULT_MODEL) + + def test_get_model_from_settings(self): + from silo_calc import ai_client + + with tempfile.TemporaryDirectory() as tmp: + _settings._SETTINGS_DIR = Path(tmp) + _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" + _settings.put("openrouter_model", "anthropic/claude-3-haiku") + self.assertEqual(ai_client._get_model(), "anthropic/claude-3-haiku") + + def test_get_instructions_default(self): + from silo_calc import ai_client + + with tempfile.TemporaryDirectory() as tmp: + _settings._SETTINGS_DIR = Path(tmp) + _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" + self.assertEqual( + ai_client._get_instructions(), ai_client.DEFAULT_INSTRUCTIONS + ) + + def test_get_instructions_from_settings(self): + from silo_calc import ai_client + + with tempfile.TemporaryDirectory() as tmp: + _settings._SETTINGS_DIR = Path(tmp) + _settings._SETTINGS_FILE = Path(tmp) / "test-settings.json" + _settings.put("openrouter_instructions", "Custom instructions") + self.assertEqual(ai_client._get_instructions(), "Custom instructions") + + +if __name__ == "__main__": + unittest.main()