refactor: remove LibreOffice Calc extension from server repo

Move pkg/calc/ to its own repository (19 files, ~4,600 lines):
- silo_calc_component.py, sync_engine.py, dialogs.py, push/pull
- AI client, completion wizard, settings, sheet format
- Addons.xcu, manifest.xml, description.xml, tests

Replace docs/CALC_EXTENSION.md with redirect to silo-calc repo
and reference to server-side ODS export endpoints.

The Calc extension now lives at:
https://git.kindred-systems.com/kindred/silo-calc
This commit is contained in:
Forbes
2026-02-06 16:19:15 -06:00
parent 503c3d1831
commit 60fa48285d
20 changed files with 2 additions and 4590 deletions

View File

@@ -1,191 +1,10 @@
# LibreOffice Calc Extension # LibreOffice Calc Extension
**Last Updated:** 2026-02-01 The Silo Calc extension has been moved to its own repository: [silo-calc](https://git.kindred-systems.com/kindred/silo-calc).
---
## Overview
The Silo Calc extension (`silo-calc.oxt`) is a LibreOffice Calc add-on that
connects project BOM spreadsheets directly to the Silo parts database.
Engineers work in their familiar spreadsheet environment while Silo handles
part number generation, revision tracking, and data synchronization.
The extension is a Python UNO component packaged as an `.oxt` file. It uses
only stdlib (`urllib`, `json`, `ssl`) -- no pip dependencies. The same
`SiloClient` pattern and auth flow from the FreeCAD workbench is reused.
---
## Architecture
```
Engineer's workstation Silo server (silod)
+--------------------------+ +------------------------+
| LibreOffice Calc | | Go API server |
| +----------------------+ | REST | +--------------------+ |
| | Silo Extension (.oxt)| <--------> | | ODS endpoints | |
| | - Pull/Push BOM | | API | | (internal/ods) | |
| | - Completion Wizard | | | +--------------------+ |
| | - AI Describe | | | | |
| +----------------------+ | | +--------------------+ |
| UNO API | cells | | | PostgreSQL | |
| +----------------------+ | | +--------------------+ |
| | Project Workbook | | +------------------------+
| | ~/projects/sheets/ | |
| | 3DX10/3DX10.ods | |
+--------------------------+
Extension also calls OpenRouter AI API directly for
description generation (does not go through silod).
```
---
## Extension Structure
```
pkg/calc/
META-INF/manifest.xml Extension manifest
description.xml Extension metadata (id, version, publisher)
description/description_en.txt English description
Addons.xcu Toolbar + menu registration
ProtocolHandler.xcu Dispatch protocol registration
silo_calc_component.py UNO DispatchProvider entry point
pythonpath/silo_calc/
__init__.py
ai_client.py OpenRouter API client
client.py SiloClient (HTTP, auth, SSL)
completion_wizard.py 3-step new item wizard
dialogs.py UNO dialog toolkit wrappers
project_files.py Local project file management
pull.py Sheet population from server
push.py Sheet changes back to server
settings.py JSON settings (~/.config/silo/calc-settings.json)
sheet_format.py Column layout constants
sync_engine.py Row hashing, classification, diff
tests/
test_basics.py 31 unit tests (no UNO/network required)
```
---
## Toolbar Commands
| Button | Command | Description |
|--------|---------|-------------|
| Login | `SiloLogin` | Username/password dialog, creates API token |
| Pull BOM | `SiloPullBOM` | Assembly picker -> expanded BOM -> populates sheet |
| Pull Project | `SiloPullProject` | Project picker -> all project items -> multi-sheet workbook |
| Push | `SiloPush` | Classifies rows -> creates/updates items -> auto-tags project |
| Add Item | `SiloAddItem` | Completion wizard (category -> description -> fields) |
| Refresh | `SiloRefresh` | Re-pull (placeholder) |
| Settings | `SiloSettings` | API URL, token, SSL, OpenRouter config |
| AI Describe | `SiloAIDescription` | AI description from seller description |
---
## BOM Sheet Format
28 columns total: 11 visible core, 13 hidden properties, 4 hidden sync tracking.
### Visible Columns
| Col | Header | Notes |
|-----|--------|-------|
| A | Item | Assembly/section header |
| B | Level | BOM depth (0=top) |
| C | Source | M=manufactured, P=purchased |
| D | PN | Part number (read-only for existing) |
| E | Description | Required for new items |
| F | Seller Description | Vendor catalog text |
| G | Unit Cost | Currency |
| H | QTY | Decimal quantity |
| I | Ext Cost | Formula =G*H (not stored) |
| J | Sourcing Link | URL |
| K | Schema | Schema name |
### Hidden Property Columns (L-X)
Manufacturer, Manufacturer PN, Supplier, Supplier PN, Lead Time, Min Order
Qty, Lifecycle Status, RoHS, Country of Origin, Material, Finish, Notes,
Long Description. Populated from revision properties, collapsed by default.
### Hidden Sync Columns (Y-AB)
`_silo_row_hash` (SHA-256), `_silo_row_status`, `_silo_updated_at`,
`_silo_parent_pn`. Used for change detection and conflict resolution.
### Row Status Colors
| Status | Color | Hex |
|--------|-------|-----|
| synced | light green | #C6EFCE |
| modified | light yellow | #FFEB9C |
| new | light blue | #BDD7EE |
| error | light red | #FFC7CE |
| conflict | orange | #F4B084 |
---
## Completion Wizard
Three-step guided workflow for adding new BOM rows:
1. **Category** -- select from schema categories (F01-X08)
2. **Description** -- required text, with AI generation offer when blank
3. **Common fields** -- sourcing type, unit cost, quantity, sourcing link
If a manually entered PN already exists in the database, the PN Conflict
Resolution dialog offers: use existing item, auto-generate new PN, or cancel.
New items are automatically tagged with the workbook's project code.
---
## OpenRouter AI Integration
The extension calls the OpenRouter API (OpenAI-compatible) to generate
standardized part descriptions from verbose seller descriptions. This is
useful because seller descriptions are typically detailed catalog text while
BOM descriptions need to be concise (max 60 chars, title case, component
type first, standard abbreviations).
### Configuration
Settings dialog fields (or `OPENROUTER_API_KEY` env var):
- **API Key** -- OpenRouter bearer token (masked in UI)
- **AI Model** -- default `openai/gpt-4.1-nano`
- **AI Instructions** -- customizable system prompt
### Workflow
1. Paste seller description into column F
2. Click "AI Describe" on toolbar
3. Review side-by-side dialog (seller text left, AI result right)
4. Edit if needed, click Accept
5. Description written to column E
The AI client (`ai_client.py`) is designed for reuse. The generic
`chat_completion()` function can be called by future features (price
analysis, sourcing assistance) without modification.
---
## Server-Side ODS Support ## Server-Side ODS Support
Pure Go ODS library at `internal/ods/` for server-side spreadsheet generation. The server-side ODS library (`internal/ods/`) and ODS endpoints remain in this repository. See `docs/SPECIFICATION.md` Section 11 for the full endpoint listing.
No headless LibreOffice dependency -- ODS is a ZIP of XML files.
### Library (`internal/ods/`)
- `ods.go` -- types: Workbook, Sheet, Column, Row, Cell, CellType
- `writer.go` -- generates valid ODS ZIP archives
- `reader.go` -- parses ODS back to Go structs
- `ods_test.go` -- 10 round-trip tests
### ODS Endpoints
| Method | Path | Description | | Method | Path | Description |
|--------|------|-------------| |--------|------|-------------|
@@ -195,61 +14,3 @@ No headless LibreOffice dependency -- ODS is a ZIP of XML files.
| GET | `/api/items/{pn}/bom/export.ods` | BOM as formatted ODS | | GET | `/api/items/{pn}/bom/export.ods` | BOM as formatted ODS |
| GET | `/api/projects/{code}/sheet.ods` | Multi-sheet project workbook | | GET | `/api/projects/{code}/sheet.ods` | Multi-sheet project workbook |
| POST | `/api/sheets/diff` | Upload ODS, return JSON diff | | POST | `/api/sheets/diff` | Upload ODS, return JSON diff |
---
## Build and Install
```makefile
make build-calc-oxt # zip pkg/calc/ into silo-calc.oxt
make install-calc # unopkg add silo-calc.oxt
make uninstall-calc # unopkg remove io.kindredsystems.silo.calc
make test-calc # python3 -m unittest (31 tests)
```
---
## Implementation Status
| Component | Status | Notes |
|-----------|--------|-------|
| Extension skeleton | Done | manifest, description, Addons.xcu, ProtocolHandler.xcu |
| SiloClient | Done | HTTP client adapted from FreeCAD workbench |
| Settings | Done | JSON persistence, env var fallbacks |
| Login dialog | Done | Two-step username/password |
| Settings dialog | Done | API URL, token, SSL, OpenRouter fields |
| Pull BOM | Done | Full column set, hidden groups, hash tracking |
| Pull Project | Done | Items sheet + BOM sheet |
| Push | Done | Create/update, auto project tagging, conflict detection |
| Completion wizard | Done | 3-step with PN conflict resolution |
| AI description | Done | OpenRouter client, review dialog, toolbar button |
| Refresh | Stub | Placeholder only |
| Go ODS library | Done | Writer, reader, 10 round-trip tests |
| ODS endpoints | Done | 6 handlers registered |
| Makefile targets | Done | build, install, uninstall, test, clean |
### Known Issues
- Refresh command is a placeholder (shows "coming soon")
- No integration tests with a running Silo instance yet
- `completion_wizard.py` uses simple input boxes instead of proper list dialogs
- Push does not yet handle BOM relationship creation (item fields only)
---
## Testing
31 unit tests in `pkg/calc/tests/test_basics.py`, runnable without UNO or
network access:
- TestSheetFormat (7) -- column indices, headers, sheet type detection
- TestSyncEngine (9) -- hashing, classification, diff, conflict detection
- TestSettings (3) -- load/save/auth
- TestProjectFiles (3) -- path resolution, read/write
- TestAIClient (9) -- constants, configuration, error handling
```
$ python3 -m unittest pkg/calc/tests/test_basics.py -v
Ran 31 tests in 0.031s
OK
```

View File

@@ -1,235 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<oor:component-data xmlns:oor="http://openoffice.org/2001/registry"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
oor:name="Addons" oor:package="org.openoffice.Office">
<!-- Toolbar definition -->
<node oor:name="AddonUI">
<node oor:name="OfficeToolBar">
<node oor:name="io.kindredsystems.silo.calc.toolbar" oor:op="replace">
<node oor:name="m01" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloLogin</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Login</value>
</prop>
<prop oor:name="Target" oor:type="xs:string">
<value>_self</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="m02" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloPullBOM</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Pull BOM</value>
</prop>
<prop oor:name="Target" oor:type="xs:string">
<value>_self</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="m03" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloPullProject</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Pull Project</value>
</prop>
<prop oor:name="Target" oor:type="xs:string">
<value>_self</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="m04" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloPush</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Push</value>
</prop>
<prop oor:name="Target" oor:type="xs:string">
<value>_self</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="m05" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloAddItem</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Add Item</value>
</prop>
<prop oor:name="Target" oor:type="xs:string">
<value>_self</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="m06" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloRefresh</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Refresh</value>
</prop>
<prop oor:name="Target" oor:type="xs:string">
<value>_self</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="m07" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloSettings</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Settings</value>
</prop>
<prop oor:name="Target" oor:type="xs:string">
<value>_self</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="m08" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloAIDescription</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">AI Describe</value>
</prop>
<prop oor:name="Target" oor:type="xs:string">
<value>_self</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
</node>
</node>
<!-- Menu entries under Tools menu -->
<node oor:name="AddonMenu">
<node oor:name="io.kindredsystems.silo.calc.menu.login" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloLogin</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Silo: Login</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="io.kindredsystems.silo.calc.menu.pullbom" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloPullBOM</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Silo: Pull BOM</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="io.kindredsystems.silo.calc.menu.pullproject" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloPullProject</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Silo: Pull Project Items</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="io.kindredsystems.silo.calc.menu.push" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloPush</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Silo: Push Changes</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="io.kindredsystems.silo.calc.menu.additem" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloAddItem</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Silo: Add Item</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="io.kindredsystems.silo.calc.menu.refresh" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloRefresh</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Silo: Refresh</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="io.kindredsystems.silo.calc.menu.settings" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloSettings</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Silo: Settings</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
<node oor:name="io.kindredsystems.silo.calc.menu.aidescription" oor:op="replace">
<prop oor:name="URL" oor:type="xs:string">
<value>io.kindredsystems.silo.calc:SiloAIDescription</value>
</prop>
<prop oor:name="Title" oor:type="xs:string">
<value xml:lang="en">Silo: AI Describe</value>
</prop>
<prop oor:name="Context" oor:type="xs:string">
<value>com.sun.star.sheet.SpreadsheetDocument</value>
</prop>
</node>
</node>
</node>
</oor:component-data>

View File

@@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE manifest:manifest PUBLIC "-//OpenOffice.org//DTD Manifest 1.0//EN" "Manifest.dtd">
<manifest:manifest xmlns:manifest="http://openoffice.org/2001/manifest">
<manifest:file-entry manifest:full-path="Addons.xcu" manifest:media-type="application/vnd.sun.star.configuration-data"/>
<manifest:file-entry manifest:full-path="ProtocolHandler.xcu" manifest:media-type="application/vnd.sun.star.configuration-data"/>
<manifest:file-entry manifest:full-path="silo_calc_component.py" manifest:media-type="application/vnd.sun.star.uno-component;type=Python"/>
</manifest:manifest>

View File

@@ -1,14 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<oor:component-data xmlns:oor="http://openoffice.org/2001/registry"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
oor:name="ProtocolHandler"
oor:package="org.openoffice.Office">
<node oor:name="HandlerSet">
<node oor:name="io.kindredsystems.silo.calc.Component" oor:op="replace">
<prop oor:name="Protocols" oor:type="oor:string-list">
<value>io.kindredsystems.silo.calc:*</value>
</prop>
</node>
</node>
</oor:component-data>

View File

@@ -1,27 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<description xmlns="http://openoffice.org/extensions/description/2006"
xmlns:dep="http://openoffice.org/extensions/description/2006"
xmlns:xlink="http://www.w3.org/1999/xlink">
<identifier value="io.kindredsystems.silo.calc"/>
<version value="0.1.0"/>
<display-name>
<name lang="en">Silo - Spreadsheet Sync</name>
</display-name>
<publisher>
<name xlink:href="https://kindredsystems.io" lang="en">Kindred Systems</name>
</publisher>
<extension-description>
<src lang="en" xlink:href="description/description_en.txt"/>
</extension-description>
<dependencies>
<OpenOffice.org-minimal-version value="4.1" dep:name="OpenOffice.org 4.1"/>
</dependencies>
<platform value="all"/>
</description>

View File

@@ -1,15 +0,0 @@
Silo Spreadsheet Sync for LibreOffice Calc
Bidirectional sync between LibreOffice Calc spreadsheets and the Silo
parts database. Pull project BOMs, edit in Calc, push changes back.
Features:
- Pull BOM: fetch an expanded bill of materials as a formatted sheet
- Pull Project: fetch all items tagged with a project code
- Push: sync local edits (new items, modified fields) back to the database
- Add Item wizard: guided workflow for adding new BOM entries
- PN conflict resolution: handle duplicate part numbers gracefully
- Auto project tagging: items in a working BOM are tagged with the project
Toolbar commands appear when a Calc spreadsheet is active.
Settings and API token are stored in ~/.config/silo/calc-settings.json.

View File

@@ -1,3 +0,0 @@
"""Silo LibreOffice Calc extension -- spreadsheet sync for project data."""
__version__ = "0.1.0"

View File

@@ -1,217 +0,0 @@
"""OpenRouter AI client for the Silo Calc extension.
Provides AI-powered text generation via the OpenRouter API
(https://openrouter.ai/api/v1/chat/completions). Uses stdlib urllib
only -- no external dependencies.
The core ``chat_completion()`` function is generic and reusable for
future features (price analysis, sourcing assistance). Domain helpers
like ``generate_description()`` build on top of it.
"""
import json
import os
import ssl
import urllib.error
import urllib.request
from typing import Any, Dict, List, Optional
from . import settings as _settings
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
OPENROUTER_API_URL = "https://openrouter.ai/api/v1/chat/completions"
DEFAULT_MODEL = "openai/gpt-4.1-nano"
DEFAULT_INSTRUCTIONS = (
"You are a parts librarian for an engineering company. "
"Given a seller's product description, produce a concise, standardized "
"part description suitable for a Bill of Materials. Rules:\n"
"- Maximum 60 characters\n"
"- Use title case\n"
"- Start with the component type (e.g., Bolt, Resistor, Bearing)\n"
"- Include key specifications (size, rating, material) in order of importance\n"
"- Omit brand names, marketing language, and redundant words\n"
"- Use standard engineering abbreviations (SS, Al, M3, 1/4-20)\n"
"- Output ONLY the description, no quotes or explanation"
)
# ---------------------------------------------------------------------------
# SSL helper (same pattern as client.py)
# ---------------------------------------------------------------------------
def _get_ssl_context() -> ssl.SSLContext:
"""Build an SSL context for OpenRouter API calls."""
ctx = ssl.create_default_context()
for ca_path in (
"/etc/ssl/certs/ca-certificates.crt",
"/etc/pki/tls/certs/ca-bundle.crt",
):
if os.path.isfile(ca_path):
try:
ctx.load_verify_locations(ca_path)
except Exception:
pass
break
return ctx
# ---------------------------------------------------------------------------
# Settings resolution helpers
# ---------------------------------------------------------------------------
def _get_api_key() -> str:
"""Resolve the OpenRouter API key from settings or environment."""
cfg = _settings.load()
key = cfg.get("openrouter_api_key", "")
if not key:
key = os.environ.get("OPENROUTER_API_KEY", "")
return key
def _get_model() -> str:
"""Resolve the model slug from settings or default."""
cfg = _settings.load()
return cfg.get("openrouter_model", "") or DEFAULT_MODEL
def _get_instructions() -> str:
"""Resolve the system instructions from settings or default."""
cfg = _settings.load()
return cfg.get("openrouter_instructions", "") or DEFAULT_INSTRUCTIONS
# ---------------------------------------------------------------------------
# Core API function
# ---------------------------------------------------------------------------
def chat_completion(
messages: List[Dict[str, str]],
model: Optional[str] = None,
temperature: float = 0.3,
max_tokens: int = 200,
) -> str:
"""Send a chat completion request to OpenRouter.
Parameters
----------
messages : list of {"role": str, "content": str}
model : model slug (default: from settings or DEFAULT_MODEL)
temperature : sampling temperature
max_tokens : maximum response tokens
Returns
-------
str : the assistant's response text
Raises
------
RuntimeError : on missing API key, HTTP errors, network errors,
or unexpected response format.
"""
api_key = _get_api_key()
if not api_key:
raise RuntimeError(
"OpenRouter API key not configured. "
"Set it in Settings or the OPENROUTER_API_KEY environment variable."
)
model = model or _get_model()
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/kindredsystems/silo",
"X-Title": "Silo Calc Extension",
}
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
OPENROUTER_API_URL, data=body, headers=headers, method="POST"
)
try:
with urllib.request.urlopen(
req, context=_get_ssl_context(), timeout=30
) as resp:
result = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8", errors="replace")
if e.code == 401:
raise RuntimeError("OpenRouter API key is invalid or expired.")
if e.code == 402:
raise RuntimeError("OpenRouter account has insufficient credits.")
if e.code == 429:
raise RuntimeError("OpenRouter rate limit exceeded. Try again shortly.")
raise RuntimeError(f"OpenRouter API error {e.code}: {error_body}")
except urllib.error.URLError as e:
raise RuntimeError(f"Network error contacting OpenRouter: {e.reason}")
choices = result.get("choices", [])
if not choices:
raise RuntimeError("OpenRouter returned an empty response.")
return choices[0].get("message", {}).get("content", "").strip()
# ---------------------------------------------------------------------------
# Domain helpers
# ---------------------------------------------------------------------------
def generate_description(
seller_description: str,
category: str = "",
existing_description: str = "",
part_number: str = "",
) -> str:
"""Generate a standardized part description from a seller description.
Parameters
----------
seller_description : the raw seller/vendor description text
category : category code (e.g. "F01") for context
existing_description : current description in col E, if any
part_number : the part number, for context
Returns
-------
str : the AI-generated standardized description
"""
system_prompt = _get_instructions()
user_parts = []
if category:
user_parts.append(f"Category: {category}")
if part_number:
user_parts.append(f"Part Number: {part_number}")
if existing_description:
user_parts.append(f"Current Description: {existing_description}")
user_parts.append(f"Seller Description: {seller_description}")
user_prompt = "\n".join(user_parts)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
return chat_completion(messages)
def is_configured() -> bool:
"""Return True if the OpenRouter API key is available."""
return bool(_get_api_key())

View File

@@ -1,447 +0,0 @@
"""Silo API client for LibreOffice Calc extension.
Adapted from pkg/freecad/silo_commands.py SiloClient. Uses urllib (no
external dependencies) and the same auth flow: session login to obtain a
persistent API token stored in a local JSON settings file.
"""
import http.cookiejar
import json
import os
import socket
import ssl
import urllib.error
import urllib.parse
import urllib.request
from typing import Any, Dict, List, Optional, Tuple
from . import settings as _settings
# ---------------------------------------------------------------------------
# SSL helpers
# ---------------------------------------------------------------------------
def _get_ssl_context() -> ssl.SSLContext:
"""Build an SSL context honouring the user's verify/cert preferences."""
cfg = _settings.load()
if not cfg.get("ssl_verify", True):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
ctx = ssl.create_default_context()
custom_cert = cfg.get("ssl_cert_path", "")
if custom_cert and os.path.isfile(custom_cert):
try:
ctx.load_verify_locations(custom_cert)
except Exception:
pass
# Load system CA bundles (bundled Python may not find them automatically)
for ca_path in (
"/etc/ssl/certs/ca-certificates.crt",
"/etc/pki/tls/certs/ca-bundle.crt",
):
if os.path.isfile(ca_path):
try:
ctx.load_verify_locations(ca_path)
except Exception:
pass
break
return ctx
# ---------------------------------------------------------------------------
# SiloClient
# ---------------------------------------------------------------------------
class SiloClient:
"""HTTP client for the Silo REST API."""
def __init__(self, base_url: str = None):
self._explicit_url = base_url
# -- URL helpers --------------------------------------------------------
@property
def base_url(self) -> str:
if self._explicit_url:
return self._explicit_url.rstrip("/")
cfg = _settings.load()
url = cfg.get("api_url", "").rstrip("/")
if not url:
url = os.environ.get("SILO_API_URL", "http://localhost:8080/api")
# Auto-append /api for bare origins
parsed = urllib.parse.urlparse(url)
if not parsed.path or parsed.path == "/":
url = url + "/api"
return url
@property
def _origin(self) -> str:
"""Server origin (without /api) for auth endpoints."""
base = self.base_url
return base.rsplit("/api", 1)[0] if base.endswith("/api") else base
# -- Auth headers -------------------------------------------------------
def _auth_headers(self) -> Dict[str, str]:
token = _settings.load().get("api_token", "") or os.environ.get(
"SILO_API_TOKEN", ""
)
if token:
return {"Authorization": f"Bearer {token}"}
return {}
# -- Core HTTP ----------------------------------------------------------
def _request(
self,
method: str,
path: str,
data: Optional[Dict] = None,
raw: bool = False,
) -> Any:
"""Make an authenticated JSON request. Returns parsed JSON.
If *raw* is True the response bytes are returned instead.
"""
url = f"{self.base_url}{path}"
headers = {"Content-Type": "application/json"}
headers.update(self._auth_headers())
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, context=_get_ssl_context()) as resp:
payload = resp.read()
if raw:
return payload
return json.loads(payload.decode())
except urllib.error.HTTPError as e:
if e.code == 401:
_settings.clear_auth()
error_body = e.read().decode()
raise RuntimeError(f"API error {e.code}: {error_body}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _download(self, path: str) -> bytes:
"""Download raw bytes from an API path."""
url = f"{self.base_url}{path}"
req = urllib.request.Request(url, headers=self._auth_headers(), method="GET")
try:
with urllib.request.urlopen(req, context=_get_ssl_context()) as resp:
return resp.read()
except urllib.error.HTTPError as e:
raise RuntimeError(f"Download error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
def _upload_ods(
self, path: str, ods_bytes: bytes, filename: str = "upload.ods"
) -> Any:
"""POST an ODS file as multipart/form-data."""
boundary = "----SiloCalcUpload" + str(abs(hash(filename)))[-8:]
parts = []
parts.append(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
f"Content-Type: application/vnd.oasis.opendocument.spreadsheet\r\n\r\n"
)
parts.append(ods_bytes)
parts.append(f"\r\n--{boundary}--\r\n")
body = b""
for p in parts:
body += p.encode("utf-8") if isinstance(p, str) else p
url = f"{self.base_url}{path}"
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Content-Length": str(len(body)),
}
headers.update(self._auth_headers())
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, context=_get_ssl_context()) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"Upload error {e.code}: {e.read().decode()}")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
# -- Authentication -----------------------------------------------------
def login(self, username: str, password: str) -> Dict[str, Any]:
"""Session login + create persistent API token (same flow as FreeCAD)."""
ctx = _get_ssl_context()
cookie_jar = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(cookie_jar),
urllib.request.HTTPSHandler(context=ctx),
)
# Step 1: POST credentials to /login
login_url = f"{self._origin}/login"
form_data = urllib.parse.urlencode(
{"username": username, "password": password}
).encode()
req = urllib.request.Request(
login_url,
data=form_data,
method="POST",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
try:
opener.open(req)
except urllib.error.HTTPError as e:
if e.code not in (302, 303):
raise RuntimeError(f"Login failed (HTTP {e.code})")
except urllib.error.URLError as e:
raise RuntimeError(f"Connection error: {e.reason}")
# Step 2: Verify via /api/auth/me
me_req = urllib.request.Request(f"{self._origin}/api/auth/me", method="GET")
try:
with opener.open(me_req) as resp:
user_info = json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
if e.code == 401:
raise RuntimeError("Login failed: invalid username or password")
raise RuntimeError(f"Login verification failed (HTTP {e.code})")
# Step 3: Create API token
hostname = socket.gethostname()
token_body = json.dumps(
{"name": f"LibreOffice Calc ({hostname})", "expires_in_days": 90}
).encode()
token_req = urllib.request.Request(
f"{self._origin}/api/auth/tokens",
data=token_body,
method="POST",
headers={"Content-Type": "application/json"},
)
try:
with opener.open(token_req) as resp:
token_result = json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"Failed to create API token (HTTP {e.code})")
raw_token = token_result.get("token", "")
if not raw_token:
raise RuntimeError("Server did not return an API token")
_settings.save_auth(
username=user_info.get("username", username),
role=user_info.get("role", ""),
source=user_info.get("auth_source", ""),
token=raw_token,
)
return {
"username": user_info.get("username", username),
"role": user_info.get("role", ""),
"auth_source": user_info.get("auth_source", ""),
"token_name": token_result.get("name", ""),
}
def logout(self):
_settings.clear_auth()
def is_authenticated(self) -> bool:
cfg = _settings.load()
return bool(cfg.get("api_token") or os.environ.get("SILO_API_TOKEN"))
def get_current_user(self) -> Optional[Dict[str, Any]]:
try:
return self._request("GET", "/auth/me")
except RuntimeError:
return None
def check_connection(self) -> Tuple[bool, str]:
url = f"{self._origin}/health"
req = urllib.request.Request(url, method="GET")
try:
with urllib.request.urlopen(
req, context=_get_ssl_context(), timeout=5
) as resp:
return True, f"OK ({resp.status})"
except urllib.error.HTTPError as e:
return True, f"Server error ({e.code})"
except urllib.error.URLError as e:
return False, str(e.reason)
except Exception as e:
return False, str(e)
# -- Items --------------------------------------------------------------
def get_item(self, part_number: str) -> Dict[str, Any]:
return self._request(
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}"
)
def list_items(self, search: str = "", project: str = "", limit: int = 100) -> list:
params = [f"limit={limit}"]
if search:
params.append(f"search={urllib.parse.quote(search)}")
if project:
params.append(f"project={urllib.parse.quote(project)}")
return self._request("GET", "/items?" + "&".join(params))
def create_item(
self,
schema: str,
category: str,
description: str = "",
projects: Optional[List[str]] = None,
sourcing_type: str = "",
sourcing_link: str = "",
standard_cost: Optional[float] = None,
long_description: str = "",
) -> Dict[str, Any]:
data: Dict[str, Any] = {
"schema": schema,
"category": category,
"description": description,
}
if projects:
data["projects"] = projects
if sourcing_type:
data["sourcing_type"] = sourcing_type
if sourcing_link:
data["sourcing_link"] = sourcing_link
if standard_cost is not None:
data["standard_cost"] = standard_cost
if long_description:
data["long_description"] = long_description
return self._request("POST", "/items", data)
def update_item(self, part_number: str, **fields) -> Dict[str, Any]:
return self._request(
"PUT", f"/items/{urllib.parse.quote(part_number, safe='')}", fields
)
# -- Projects -----------------------------------------------------------
def get_projects(self) -> list:
return self._request("GET", "/projects")
def get_project_items(self, code: str) -> list:
return self._request(
"GET", f"/projects/{urllib.parse.quote(code, safe='')}/items"
)
def add_item_projects(
self, part_number: str, project_codes: List[str]
) -> Dict[str, Any]:
return self._request(
"POST",
f"/items/{urllib.parse.quote(part_number, safe='')}/projects",
{"projects": project_codes},
)
def get_item_projects(self, part_number: str) -> list:
return self._request(
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}/projects"
)
# -- Schemas ------------------------------------------------------------
def get_schema(self, name: str = "kindred-rd") -> Dict[str, Any]:
return self._request("GET", f"/schemas/{urllib.parse.quote(name, safe='')}")
def get_property_schema(self, name: str = "kindred-rd") -> Dict[str, Any]:
return self._request(
"GET", f"/schemas/{urllib.parse.quote(name, safe='')}/properties"
)
# -- BOM ----------------------------------------------------------------
def get_bom(self, part_number: str) -> list:
return self._request(
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}/bom"
)
def get_bom_expanded(self, part_number: str, depth: int = 10) -> list:
return self._request(
"GET",
f"/items/{urllib.parse.quote(part_number, safe='')}/bom/expanded?depth={depth}",
)
def add_bom_entry(
self,
parent_pn: str,
child_pn: str,
quantity: Optional[float] = None,
rel_type: str = "component",
metadata: Optional[Dict] = None,
) -> Dict[str, Any]:
data: Dict[str, Any] = {
"child_part_number": child_pn,
"rel_type": rel_type,
}
if quantity is not None:
data["quantity"] = quantity
if metadata:
data["metadata"] = metadata
return self._request(
"POST", f"/items/{urllib.parse.quote(parent_pn, safe='')}/bom", data
)
def update_bom_entry(
self,
parent_pn: str,
child_pn: str,
quantity: Optional[float] = None,
metadata: Optional[Dict] = None,
) -> Dict[str, Any]:
data: Dict[str, Any] = {}
if quantity is not None:
data["quantity"] = quantity
if metadata:
data["metadata"] = metadata
return self._request(
"PUT",
f"/items/{urllib.parse.quote(parent_pn, safe='')}/bom/{urllib.parse.quote(child_pn, safe='')}",
data,
)
# -- Revisions ----------------------------------------------------------
def get_revisions(self, part_number: str) -> list:
return self._request(
"GET", f"/items/{urllib.parse.quote(part_number, safe='')}/revisions"
)
def get_revision(self, part_number: str, revision: int) -> Dict[str, Any]:
return self._request(
"GET",
f"/items/{urllib.parse.quote(part_number, safe='')}/revisions/{revision}",
)
# -- ODS endpoints ------------------------------------------------------
def download_bom_ods(self, part_number: str) -> bytes:
return self._download(
f"/items/{urllib.parse.quote(part_number, safe='')}/bom/export.ods"
)
def download_project_sheet(self, project_code: str) -> bytes:
return self._download(
f"/projects/{urllib.parse.quote(project_code, safe='')}/sheet.ods"
)
def upload_sheet_diff(
self, ods_bytes: bytes, filename: str = "sheet.ods"
) -> Dict[str, Any]:
return self._upload_ods("/sheets/diff", ods_bytes, filename)
# -- Part number generation ---------------------------------------------
def generate_part_number(self, schema: str, category: str) -> Dict[str, Any]:
return self._request(
"POST",
"/generate-part-number",
{"schema": schema, "category": category},
)

View File

@@ -1,395 +0,0 @@
"""Completion Wizard for adding new items to a BOM sheet.
Three-step guided workflow:
1. Category selection (from schema)
2. Required fields (Description, optional PN)
3. Common fields (Source, Unit Cost, QTY, Sourcing Link, category-specific properties)
If a manually entered PN already exists, the PN Conflict Resolution dialog
is shown.
"""
from typing import Any, Dict, List, Optional, Tuple
from . import ai_client as _ai
from . import dialogs, sync_engine
from . import settings as _settings
from . import sheet_format as sf
from .client import SiloClient
# UNO imports
try:
import uno
_HAS_UNO = True
_HAS_UNO = True
except ImportError:
_HAS_UNO = False
# Category prefix descriptions for grouping in the picker
_PREFIX_GROUPS = {
"F": "Fasteners",
"C": "Fittings",
"R": "Motion",
"S": "Structural",
"E": "Electrical",
"M": "Mechanical",
"T": "Tooling",
"A": "Assemblies",
"P": "Purchased",
"X": "Custom Fabricated",
}
# Default sourcing type by category prefix
_DEFAULT_SOURCING = {
"A": "M", # assemblies are manufactured
"X": "M", # custom fab is manufactured
"T": "M", # tooling is manufactured
}
def _get_categories(
client: SiloClient, schema: str = "kindred-rd"
) -> List[Tuple[str, str]]:
"""Fetch category codes and descriptions from the schema.
Returns list of (code, description) tuples sorted by code.
"""
try:
schema_data = client.get_schema(schema)
segments = schema_data.get("segments", [])
cat_segment = None
for seg in segments:
if seg.get("name") == "category":
cat_segment = seg
break
if cat_segment and cat_segment.get("values"):
return sorted(cat_segment["values"].items())
except RuntimeError:
pass
return []
def _get_category_properties(
client: SiloClient, category: str, schema: str = "kindred-rd"
) -> List[str]:
"""Fetch property field names relevant to a category.
Returns the list of property keys that apply to the category's prefix group.
"""
try:
prop_schema = client.get_property_schema(schema)
# prop_schema has global defaults and category-specific overrides
defaults = prop_schema.get("defaults", {})
category_props = prop_schema.get("categories", {}).get(category[:1], {})
# Merge: category-specific fields + global defaults
all_keys = set(defaults.keys())
all_keys.update(category_props.keys())
return sorted(all_keys)
except RuntimeError:
return list(sf.PROPERTY_KEY_MAP.values())
# ---------------------------------------------------------------------------
# Wizard dialog (UNO)
# ---------------------------------------------------------------------------
def run_completion_wizard(
client: SiloClient,
doc,
sheet,
insert_row: int,
project_code: str = "",
schema: str = "kindred-rd",
) -> bool:
"""Run the item completion wizard. Returns True if a row was inserted.
Parameters
----------
client : SiloClient
doc : XSpreadsheetDocument
sheet : XSpreadsheet
insert_row : int (0-based row index to insert at)
project_code : str (for auto-tagging)
schema : str
"""
if not _HAS_UNO:
return False
ctx = uno.getComponentContext()
smgr = ctx.ServiceManager
# -- Step 1: Category selection -----------------------------------------
categories = _get_categories(client, schema)
if not categories:
dialogs._msgbox(
None,
"Add Item",
"Could not fetch categories from server.",
box_type="errorbox",
)
return False
# Build display list grouped by prefix
cat_display = []
for code, desc in categories:
prefix = code[0] if code else "?"
group = _PREFIX_GROUPS.get(prefix, "Other")
cat_display.append(f"{code} - {desc} [{group}]")
# Use a simple input box with the category list as hint
# (A proper ListBox dialog would be more polished but this is functional)
cat_hint = ", ".join(c[0] for c in categories[:20])
if len(categories) > 20:
cat_hint += f"... ({len(categories)} total)"
category_input = dialogs._input_box(
"Add Item - Step 1/3",
f"Category code ({cat_hint}):",
)
if not category_input:
return False
category = category_input.strip().upper()
# Validate category
valid_codes = {c[0] for c in categories}
if category not in valid_codes:
dialogs._msgbox(
None,
"Add Item",
f"Unknown category: {category}",
box_type="errorbox",
)
return False
# -- Step 2: Required fields --------------------------------------------
description = dialogs._input_box(
"Add Item - Step 2/3",
"Description (required, leave blank to use AI):",
)
# If blank and AI is configured, offer AI generation from seller description
if (not description or not description.strip()) and _ai.is_configured():
seller_desc = dialogs._input_box(
"Add Item - AI Description",
"Paste the seller description for AI generation:",
)
if seller_desc and seller_desc.strip():
try:
ai_desc = _ai.generate_description(
seller_description=seller_desc.strip(),
category=category,
)
accepted = dialogs.show_ai_description_dialog(
seller_desc.strip(), ai_desc
)
if accepted:
description = accepted
except RuntimeError as e:
dialogs._msgbox(
None,
"AI Description Failed",
str(e),
box_type="errorbox",
)
if not description or not description.strip():
dialogs._msgbox(
None, "Add Item", "Description is required.", box_type="errorbox"
)
return False
manual_pn = dialogs._input_box(
"Add Item - Step 2/3",
"Part number (leave blank for auto-generation):",
)
# Check for PN conflict if user entered one
use_existing_item = None
if manual_pn and manual_pn.strip():
manual_pn = manual_pn.strip()
try:
existing = client.get_item(manual_pn)
# PN exists -- show conflict dialog
result = dialogs.show_pn_conflict_dialog(manual_pn, existing)
if result == dialogs.PN_USE_EXISTING:
use_existing_item = existing
elif result == dialogs.PN_CREATE_NEW:
manual_pn = "" # will auto-generate
else:
return False # cancelled
except RuntimeError:
pass # PN doesn't exist, which is fine
# -- Step 3: Common fields ----------------------------------------------
prefix = category[0] if category else ""
default_source = _DEFAULT_SOURCING.get(prefix, "P")
source = dialogs._input_box(
"Add Item - Step 3/3",
f"Sourcing type (M=manufactured, P=purchased) [default: {default_source}]:",
default=default_source,
)
if source is None:
return False
source = source.strip().upper() or default_source
unit_cost_str = dialogs._input_box(
"Add Item - Step 3/3",
"Unit cost (e.g. 10.50):",
default="0",
)
unit_cost = 0.0
if unit_cost_str:
try:
unit_cost = float(unit_cost_str.strip().replace("$", "").replace(",", ""))
except ValueError:
pass
qty_str = dialogs._input_box(
"Add Item - Step 3/3",
"Quantity [default: 1]:",
default="1",
)
qty = 1.0
if qty_str:
try:
qty = float(qty_str.strip())
except ValueError:
pass
sourcing_link = (
dialogs._input_box(
"Add Item - Step 3/3",
"Sourcing link (URL, optional):",
)
or ""
)
# -- Create item or use existing ----------------------------------------
created_item = None
if use_existing_item:
# Use the existing item's data
created_item = use_existing_item
final_pn = use_existing_item.get("part_number", manual_pn)
elif manual_pn:
# Create with the user's manual PN
try:
created_item = client.create_item(
schema=schema,
category=category,
description=description.strip(),
projects=[project_code] if project_code else None,
sourcing_type=source,
sourcing_link=sourcing_link.strip(),
standard_cost=unit_cost if unit_cost else None,
)
final_pn = created_item.get("part_number", manual_pn)
except RuntimeError as e:
dialogs._msgbox(None, "Add Item Failed", str(e), box_type="errorbox")
return False
else:
# Auto-generate PN
try:
created_item = client.create_item(
schema=schema,
category=category,
description=description.strip(),
projects=[project_code] if project_code else None,
sourcing_type=source,
sourcing_link=sourcing_link.strip(),
standard_cost=unit_cost if unit_cost else None,
)
final_pn = created_item.get("part_number", "")
except RuntimeError as e:
dialogs._msgbox(None, "Add Item Failed", str(e), box_type="errorbox")
return False
if not final_pn:
dialogs._msgbox(
None, "Add Item", "No part number returned.", box_type="errorbox"
)
return False
# Auto-tag with project if needed
if project_code and created_item and not use_existing_item:
try:
client.add_item_projects(final_pn, [project_code])
except RuntimeError:
pass
# -- Insert row into sheet ----------------------------------------------
_insert_bom_row(
sheet,
insert_row,
pn=final_pn,
description=created_item.get("description", description.strip())
if created_item
else description.strip(),
unit_cost=unit_cost,
qty=qty,
sourcing_link=sourcing_link.strip(),
schema=schema,
status=sync_engine.STATUS_NEW,
parent_pn="",
)
return True
def _insert_bom_row(
sheet,
row: int,
pn: str,
description: str,
source: str,
unit_cost: float,
qty: float,
sourcing_link: str,
schema: str,
status: str,
parent_pn: str,
):
"""Write a single BOM row at the given position with sync tracking."""
from . import pull as _pull # avoid circular import at module level
_pull._set_cell_string(sheet, sf.COL_ITEM, row, "")
_pull._set_cell_string(sheet, sf.COL_LEVEL, row, "")
_pull._set_cell_string(sheet, sf.COL_SOURCE, row, source)
_pull._set_cell_string(sheet, sf.COL_PN, row, pn)
_pull._set_cell_string(sheet, sf.COL_DESCRIPTION, row, description)
_pull._set_cell_string(sheet, sf.COL_SELLER_DESC, row, "")
if unit_cost:
_pull._set_cell_float(sheet, sf.COL_UNIT_COST, row, unit_cost)
_pull._set_cell_float(sheet, sf.COL_QTY, row, qty)
# Ext Cost formula
ext_formula = f"={sf.col_letter(sf.COL_UNIT_COST)}{row + 1}*{sf.col_letter(sf.COL_QTY)}{row + 1}"
_pull._set_cell_formula(sheet, sf.COL_EXT_COST, row, ext_formula)
_pull._set_cell_string(sheet, sf.COL_SOURCING_LINK, row, sourcing_link)
_pull._set_cell_string(sheet, sf.COL_SCHEMA, row, schema)
# Build row cells for hash computation
row_cells = [""] * sf.BOM_TOTAL_COLS
row_cells[sf.COL_SOURCE] = source
row_cells[sf.COL_PN] = pn
row_cells[sf.COL_DESCRIPTION] = description
row_cells[sf.COL_UNIT_COST] = str(unit_cost) if unit_cost else ""
row_cells[sf.COL_QTY] = str(qty)
row_cells[sf.COL_SOURCING_LINK] = sourcing_link
row_cells[sf.COL_SCHEMA] = schema
sync_engine.update_row_sync_state(row_cells, status, parent_pn=parent_pn)
_pull._set_cell_string(sheet, sf.COL_ROW_HASH, row, row_cells[sf.COL_ROW_HASH])
_pull._set_cell_string(sheet, sf.COL_ROW_STATUS, row, row_cells[sf.COL_ROW_STATUS])
_pull._set_cell_string(sheet, sf.COL_UPDATED_AT, row, row_cells[sf.COL_UPDATED_AT])
_pull._set_cell_string(sheet, sf.COL_PARENT_PN, row, row_cells[sf.COL_PARENT_PN])
# Colour the row
color = _pull._STATUS_COLORS.get(status)
if color:
_pull._set_row_bg(sheet, row, sf.BOM_TOTAL_COLS, color)

View File

@@ -1,667 +0,0 @@
"""UNO dialogs for the Silo Calc extension.
Provides login, settings, push summary, and PN conflict resolution dialogs.
All dialogs use the UNO dialog toolkit (``com.sun.star.awt``).
"""
from typing import Any, Dict, List, Optional, Tuple
# UNO imports are only available inside LibreOffice
try:
import uno
_HAS_UNO = True
except ImportError:
_HAS_UNO = False
from . import settings as _settings
from .client import SiloClient
def _get_desktop():
"""Return the XSCRIPTCONTEXT desktop, or resolve via component context."""
ctx = uno.getComponentContext()
smgr = ctx.ServiceManager
return smgr.createInstanceWithContext("com.sun.star.frame.Desktop", ctx)
def _msgbox(parent, title: str, message: str, box_type="infobox"):
"""Show a simple message box."""
if not _HAS_UNO:
return
ctx = uno.getComponentContext()
smgr = ctx.ServiceManager
toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx)
if parent is None:
parent = _get_desktop().getCurrentFrame().getContainerWindow()
mbt = uno.Enum(
"com.sun.star.awt.MessageBoxType",
"INFOBOX" if box_type == "infobox" else "ERRORBOX",
)
msg_box = toolkit.createMessageBox(parent, mbt, 1, title, message)
msg_box.execute()
def _input_box(
title: str, label: str, default: str = "", password: bool = False
) -> Optional[str]:
"""Show a simple single-field input dialog. Returns None on cancel."""
if not _HAS_UNO:
return None
ctx = uno.getComponentContext()
smgr = ctx.ServiceManager
dlg_provider = smgr.createInstanceWithContext(
"com.sun.star.awt.DialogProvider", ctx
)
# Build dialog model programmatically
dlg_model = smgr.createInstanceWithContext(
"com.sun.star.awt.UnoControlDialogModel", ctx
)
dlg_model.Width = 220
dlg_model.Height = 80
dlg_model.Title = title
# Label
lbl = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel")
lbl.Name = "lbl"
lbl.PositionX = 10
lbl.PositionY = 10
lbl.Width = 200
lbl.Height = 12
lbl.Label = label
dlg_model.insertByName("lbl", lbl)
# Text field
tf = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel")
tf.Name = "tf"
tf.PositionX = 10
tf.PositionY = 24
tf.Width = 200
tf.Height = 14
tf.Text = default
if password:
tf.EchoChar = ord("*")
dlg_model.insertByName("tf", tf)
# OK button
btn_ok = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel")
btn_ok.Name = "btn_ok"
btn_ok.PositionX = 110
btn_ok.PositionY = 50
btn_ok.Width = 45
btn_ok.Height = 16
btn_ok.Label = "OK"
btn_ok.PushButtonType = 1 # OK
dlg_model.insertByName("btn_ok", btn_ok)
# Cancel button
btn_cancel = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel")
btn_cancel.Name = "btn_cancel"
btn_cancel.PositionX = 160
btn_cancel.PositionY = 50
btn_cancel.Width = 45
btn_cancel.Height = 16
btn_cancel.Label = "Cancel"
btn_cancel.PushButtonType = 2 # CANCEL
dlg_model.insertByName("btn_cancel", btn_cancel)
# Create dialog control
dlg = smgr.createInstanceWithContext("com.sun.star.awt.UnoControlDialog", ctx)
dlg.setModel(dlg_model)
dlg.setVisible(False)
toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx)
dlg.createPeer(toolkit, None)
result = dlg.execute()
if result == 1: # OK
text = dlg.getControl("tf").getText()
dlg.dispose()
return text
dlg.dispose()
return None
# ---------------------------------------------------------------------------
# Login dialog
# ---------------------------------------------------------------------------
def show_login_dialog(parent=None) -> bool:
"""Two-step login: username then password. Returns True on success."""
username = _input_box("Silo Login", "Username:")
if not username:
return False
password = _input_box("Silo Login", f"Password for {username}:", password=True)
if not password:
return False
client = SiloClient()
try:
result = client.login(username, password)
_msgbox(
parent,
"Silo Login",
f"Logged in as {result['username']} ({result.get('role', 'viewer')})",
)
return True
except RuntimeError as e:
_msgbox(parent, "Silo Login Failed", str(e), box_type="errorbox")
return False
# ---------------------------------------------------------------------------
# Settings dialog
# ---------------------------------------------------------------------------
def show_settings_dialog(parent=None) -> bool:
"""Show the settings dialog. Returns True if saved."""
if not _HAS_UNO:
return False
ctx = uno.getComponentContext()
smgr = ctx.ServiceManager
cfg = _settings.load()
dlg_model = smgr.createInstanceWithContext(
"com.sun.star.awt.UnoControlDialogModel", ctx
)
dlg_model.Width = 300
dlg_model.Height = 200
dlg_model.Title = "Silo Settings"
fields = [
("API URL", "api_url", cfg.get("api_url", "")),
("API Token", "api_token", cfg.get("api_token", "")),
("SSL Cert Path", "ssl_cert_path", cfg.get("ssl_cert_path", "")),
("Projects Dir", "projects_dir", cfg.get("projects_dir", "")),
("Default Schema", "default_schema", cfg.get("default_schema", "kindred-rd")),
]
y = 10
for label_text, name, default in fields:
lbl = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel")
lbl.Name = f"lbl_{name}"
lbl.PositionX = 10
lbl.PositionY = y
lbl.Width = 80
lbl.Height = 12
lbl.Label = label_text
dlg_model.insertByName(f"lbl_{name}", lbl)
tf = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel")
tf.Name = f"tf_{name}"
tf.PositionX = 95
tf.PositionY = y
tf.Width = 195
tf.Height = 14
tf.Text = default
dlg_model.insertByName(f"tf_{name}", tf)
y += 22
# SSL verify checkbox
cb = dlg_model.createInstance("com.sun.star.awt.UnoControlCheckBoxModel")
cb.Name = "cb_ssl_verify"
cb.PositionX = 95
cb.PositionY = y
cb.Width = 120
cb.Height = 14
cb.Label = "Verify SSL"
cb.State = 1 if cfg.get("ssl_verify", True) else 0
dlg_model.insertByName("cb_ssl_verify", cb)
y += 22
# --- OpenRouter AI section ---
lbl_ai = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel")
lbl_ai.Name = "lbl_ai_section"
lbl_ai.PositionX = 10
lbl_ai.PositionY = y
lbl_ai.Width = 280
lbl_ai.Height = 12
lbl_ai.Label = "--- OpenRouter AI ---"
dlg_model.insertByName("lbl_ai_section", lbl_ai)
y += 16
# API Key (masked)
lbl_key = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel")
lbl_key.Name = "lbl_openrouter_api_key"
lbl_key.PositionX = 10
lbl_key.PositionY = y
lbl_key.Width = 80
lbl_key.Height = 12
lbl_key.Label = "API Key"
dlg_model.insertByName("lbl_openrouter_api_key", lbl_key)
tf_key = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel")
tf_key.Name = "tf_openrouter_api_key"
tf_key.PositionX = 95
tf_key.PositionY = y
tf_key.Width = 195
tf_key.Height = 14
tf_key.Text = cfg.get("openrouter_api_key", "")
tf_key.EchoChar = ord("*")
dlg_model.insertByName("tf_openrouter_api_key", tf_key)
y += 22
# AI Model
lbl_model = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel")
lbl_model.Name = "lbl_openrouter_model"
lbl_model.PositionX = 10
lbl_model.PositionY = y
lbl_model.Width = 80
lbl_model.Height = 12
lbl_model.Label = "AI Model"
dlg_model.insertByName("lbl_openrouter_model", lbl_model)
tf_model = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel")
tf_model.Name = "tf_openrouter_model"
tf_model.PositionX = 95
tf_model.PositionY = y
tf_model.Width = 195
tf_model.Height = 14
tf_model.Text = cfg.get("openrouter_model", "")
tf_model.HelpText = "openai/gpt-4.1-nano"
dlg_model.insertByName("tf_openrouter_model", tf_model)
y += 22
# AI Instructions (multi-line)
lbl_instr = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel")
lbl_instr.Name = "lbl_openrouter_instructions"
lbl_instr.PositionX = 10
lbl_instr.PositionY = y
lbl_instr.Width = 80
lbl_instr.Height = 12
lbl_instr.Label = "AI Instructions"
dlg_model.insertByName("lbl_openrouter_instructions", lbl_instr)
tf_instr = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel")
tf_instr.Name = "tf_openrouter_instructions"
tf_instr.PositionX = 95
tf_instr.PositionY = y
tf_instr.Width = 195
tf_instr.Height = 56
tf_instr.Text = cfg.get("openrouter_instructions", "")
tf_instr.MultiLine = True
tf_instr.VScroll = True
tf_instr.HelpText = "Custom system prompt (leave blank for default)"
dlg_model.insertByName("tf_openrouter_instructions", tf_instr)
y += 62
# Test connection button
btn_test = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel")
btn_test.Name = "btn_test"
btn_test.PositionX = 10
btn_test.PositionY = y
btn_test.Width = 80
btn_test.Height = 16
btn_test.Label = "Test Connection"
dlg_model.insertByName("btn_test", btn_test)
# Status label
lbl_status = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel")
lbl_status.Name = "lbl_status"
lbl_status.PositionX = 95
lbl_status.PositionY = y + 2
lbl_status.Width = 195
lbl_status.Height = 12
lbl_status.Label = ""
dlg_model.insertByName("lbl_status", lbl_status)
y += 22
# OK / Cancel
btn_ok = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel")
btn_ok.Name = "btn_ok"
btn_ok.PositionX = 190
btn_ok.PositionY = y
btn_ok.Width = 45
btn_ok.Height = 16
btn_ok.Label = "Save"
btn_ok.PushButtonType = 1
dlg_model.insertByName("btn_ok", btn_ok)
btn_cancel = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel")
btn_cancel.Name = "btn_cancel"
btn_cancel.PositionX = 240
btn_cancel.PositionY = y
btn_cancel.Width = 45
btn_cancel.Height = 16
btn_cancel.Label = "Cancel"
btn_cancel.PushButtonType = 2
dlg_model.insertByName("btn_cancel", btn_cancel)
dlg_model.Height = y + 26
dlg = smgr.createInstanceWithContext("com.sun.star.awt.UnoControlDialog", ctx)
dlg.setModel(dlg_model)
dlg.setVisible(False)
toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx)
dlg.createPeer(toolkit, None)
result = dlg.execute()
if result == 1:
for _, name, _ in fields:
cfg[name] = dlg.getControl(f"tf_{name}").getText()
cfg["ssl_verify"] = bool(dlg.getControl("cb_ssl_verify").getModel().State)
cfg["openrouter_api_key"] = dlg.getControl("tf_openrouter_api_key").getText()
cfg["openrouter_model"] = dlg.getControl("tf_openrouter_model").getText()
cfg["openrouter_instructions"] = dlg.getControl(
"tf_openrouter_instructions"
).getText()
_settings.save(cfg)
dlg.dispose()
return True
dlg.dispose()
return False
# ---------------------------------------------------------------------------
# Push summary dialog
# ---------------------------------------------------------------------------
def show_push_summary(
new_count: int,
modified_count: int,
conflict_count: int,
unchanged_count: int,
parent=None,
) -> bool:
"""Show push summary and return True if user confirms."""
lines = [
f"New items: {new_count}",
f"Modified items: {modified_count}",
f"Conflicts: {conflict_count}",
f"Unchanged: {unchanged_count}",
]
if conflict_count:
lines.append("\nConflicts must be resolved before pushing.")
msg = "\n".join(lines)
if conflict_count:
_msgbox(parent, "Silo Push -- Conflicts Found", msg, box_type="errorbox")
return False
if new_count == 0 and modified_count == 0:
_msgbox(parent, "Silo Push", "Nothing to push -- all rows are up to date.")
return False
# Confirmation -- for now use a simple info box (OK = proceed)
_msgbox(parent, "Silo Push", f"Ready to push:\n\n{msg}\n\nProceed?")
return True
# ---------------------------------------------------------------------------
# PN Conflict Resolution dialog
# ---------------------------------------------------------------------------
# Return values
PN_USE_EXISTING = "use_existing"
PN_CREATE_NEW = "create_new"
PN_CANCEL = "cancel"
def show_pn_conflict_dialog(
part_number: str,
existing_item: Dict[str, Any],
parent=None,
) -> str:
"""Show PN conflict dialog when a manually entered PN already exists.
Returns one of: PN_USE_EXISTING, PN_CREATE_NEW, PN_CANCEL.
"""
if not _HAS_UNO:
return PN_CANCEL
ctx = uno.getComponentContext()
smgr = ctx.ServiceManager
dlg_model = smgr.createInstanceWithContext(
"com.sun.star.awt.UnoControlDialogModel", ctx
)
dlg_model.Width = 320
dlg_model.Height = 220
dlg_model.Title = f"Part Number Conflict: {part_number}"
y = 10
info_lines = [
"This part number already exists in Silo:",
"",
f" Description: {existing_item.get('description', '')}",
f" Type: {existing_item.get('item_type', '')}",
f" Category: {existing_item.get('part_number', '')[:3]}",
f" Sourcing: {existing_item.get('sourcing_type', '')}",
f" Cost: ${existing_item.get('standard_cost', 0):.2f}",
]
for line in info_lines:
lbl = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel")
lbl.Name = f"info_{y}"
lbl.PositionX = 10
lbl.PositionY = y
lbl.Width = 300
lbl.Height = 12
lbl.Label = line
dlg_model.insertByName(f"info_{y}", lbl)
y += 13
y += 5
# Radio buttons
rb_use = dlg_model.createInstance("com.sun.star.awt.UnoControlRadioButtonModel")
rb_use.Name = "rb_use"
rb_use.PositionX = 20
rb_use.PositionY = y
rb_use.Width = 280
rb_use.Height = 14
rb_use.Label = "Use existing item (add to BOM)"
rb_use.State = 1 # selected by default
dlg_model.insertByName("rb_use", rb_use)
y += 18
rb_new = dlg_model.createInstance("com.sun.star.awt.UnoControlRadioButtonModel")
rb_new.Name = "rb_new"
rb_new.PositionX = 20
rb_new.PositionY = y
rb_new.Width = 280
rb_new.Height = 14
rb_new.Label = "Create new item (auto-generate PN)"
rb_new.State = 0
dlg_model.insertByName("rb_new", rb_new)
y += 18
rb_cancel = dlg_model.createInstance("com.sun.star.awt.UnoControlRadioButtonModel")
rb_cancel.Name = "rb_cancel"
rb_cancel.PositionX = 20
rb_cancel.PositionY = y
rb_cancel.Width = 280
rb_cancel.Height = 14
rb_cancel.Label = "Cancel"
rb_cancel.State = 0
dlg_model.insertByName("rb_cancel", rb_cancel)
y += 25
# OK button
btn_ok = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel")
btn_ok.Name = "btn_ok"
btn_ok.PositionX = 210
btn_ok.PositionY = y
btn_ok.Width = 45
btn_ok.Height = 16
btn_ok.Label = "OK"
btn_ok.PushButtonType = 1
dlg_model.insertByName("btn_ok", btn_ok)
btn_cancel_btn = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel")
btn_cancel_btn.Name = "btn_cancel_btn"
btn_cancel_btn.PositionX = 260
btn_cancel_btn.PositionY = y
btn_cancel_btn.Width = 45
btn_cancel_btn.Height = 16
btn_cancel_btn.Label = "Cancel"
btn_cancel_btn.PushButtonType = 2
dlg_model.insertByName("btn_cancel_btn", btn_cancel_btn)
dlg_model.Height = y + 26
dlg = smgr.createInstanceWithContext("com.sun.star.awt.UnoControlDialog", ctx)
dlg.setModel(dlg_model)
dlg.setVisible(False)
toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx)
dlg.createPeer(toolkit, None)
result = dlg.execute()
if result != 1:
dlg.dispose()
return PN_CANCEL
if dlg.getControl("rb_use").getModel().State:
dlg.dispose()
return PN_USE_EXISTING
if dlg.getControl("rb_new").getModel().State:
dlg.dispose()
return PN_CREATE_NEW
dlg.dispose()
return PN_CANCEL
# ---------------------------------------------------------------------------
# AI Description review dialog
# ---------------------------------------------------------------------------
def show_ai_description_dialog(
seller_description: str, ai_description: str, parent=None
) -> Optional[str]:
"""Show AI-generated description for review/editing.
Side-by-side layout: seller description (read-only) on the left,
AI-generated description (editable) on the right.
Returns the accepted/edited description text, or None on cancel.
"""
if not _HAS_UNO:
return None
ctx = uno.getComponentContext()
smgr = ctx.ServiceManager
dlg_model = smgr.createInstanceWithContext(
"com.sun.star.awt.UnoControlDialogModel", ctx
)
dlg_model.Width = 400
dlg_model.Height = 210
dlg_model.Title = "AI Description Review"
# Left: Seller Description (read-only)
lbl_seller = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel")
lbl_seller.Name = "lbl_seller"
lbl_seller.PositionX = 10
lbl_seller.PositionY = 8
lbl_seller.Width = 185
lbl_seller.Height = 12
lbl_seller.Label = "Seller Description"
dlg_model.insertByName("lbl_seller", lbl_seller)
tf_seller = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel")
tf_seller.Name = "tf_seller"
tf_seller.PositionX = 10
tf_seller.PositionY = 22
tf_seller.Width = 185
tf_seller.Height = 140
tf_seller.Text = seller_description
tf_seller.MultiLine = True
tf_seller.VScroll = True
tf_seller.ReadOnly = True
dlg_model.insertByName("tf_seller", tf_seller)
# Right: Generated Description (editable)
lbl_gen = dlg_model.createInstance("com.sun.star.awt.UnoControlFixedTextModel")
lbl_gen.Name = "lbl_gen"
lbl_gen.PositionX = 205
lbl_gen.PositionY = 8
lbl_gen.Width = 185
lbl_gen.Height = 12
lbl_gen.Label = "Generated Description (editable)"
dlg_model.insertByName("lbl_gen", lbl_gen)
tf_gen = dlg_model.createInstance("com.sun.star.awt.UnoControlEditModel")
tf_gen.Name = "tf_gen"
tf_gen.PositionX = 205
tf_gen.PositionY = 22
tf_gen.Width = 185
tf_gen.Height = 140
tf_gen.Text = ai_description
tf_gen.MultiLine = True
tf_gen.VScroll = True
dlg_model.insertByName("tf_gen", tf_gen)
# Accept button
btn_ok = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel")
btn_ok.Name = "btn_ok"
btn_ok.PositionX = 290
btn_ok.PositionY = 175
btn_ok.Width = 50
btn_ok.Height = 18
btn_ok.Label = "Accept"
btn_ok.PushButtonType = 1 # OK
dlg_model.insertByName("btn_ok", btn_ok)
# Cancel button
btn_cancel = dlg_model.createInstance("com.sun.star.awt.UnoControlButtonModel")
btn_cancel.Name = "btn_cancel"
btn_cancel.PositionX = 345
btn_cancel.PositionY = 175
btn_cancel.Width = 45
btn_cancel.Height = 18
btn_cancel.Label = "Cancel"
btn_cancel.PushButtonType = 2 # CANCEL
dlg_model.insertByName("btn_cancel", btn_cancel)
dlg = smgr.createInstanceWithContext("com.sun.star.awt.UnoControlDialog", ctx)
dlg.setModel(dlg_model)
dlg.setVisible(False)
toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx)
dlg.createPeer(toolkit, None)
result = dlg.execute()
if result == 1: # OK / Accept
text = dlg.getControl("tf_gen").getText()
dlg.dispose()
return text
dlg.dispose()
return None
# ---------------------------------------------------------------------------
# Assembly / Project picker dialogs
# ---------------------------------------------------------------------------
def show_assembly_picker(client: SiloClient, parent=None) -> Optional[str]:
"""Show a dialog to pick an assembly by PN. Returns the PN or None."""
pn = _input_box("Pull BOM", "Assembly part number (e.g. A01-0003):")
return pn if pn and pn.strip() else None
def show_project_picker(client: SiloClient, parent=None) -> Optional[str]:
"""Show a dialog to pick a project code. Returns the code or None."""
try:
projects = client.get_projects()
except RuntimeError:
projects = []
if not projects:
code = _input_box("Pull Project", "Project code:")
return code if code and code.strip() else None
# Build a choice list
choices = [f"{p.get('code', '')} - {p.get('name', '')}" for p in projects]
# For simplicity, use an input box with hint. A proper list picker
# would use a ListBox control, but this is functional for now.
hint = "Available: " + ", ".join(p.get("code", "") for p in projects)
code = _input_box("Pull Project", f"Project code ({hint}):")
return code if code and code.strip() else None

View File

@@ -1,76 +0,0 @@
"""Local project file management for ODS workbooks.
Mirrors the FreeCAD file path pattern from ``pkg/freecad/silo_commands.py``.
Project ODS files live at::
~/projects/sheets/{PROJECT_CODE}/{PROJECT_CODE}.ods
The ``SILO_PROJECTS_DIR`` env var (shared with the FreeCAD workbench)
controls the base directory.
"""
import os
from pathlib import Path
from typing import Optional
from . import settings as _settings
def get_sheets_dir() -> Path:
"""Return the base directory for ODS project sheets."""
return _settings.get_projects_dir() / "sheets"
def get_project_sheet_path(project_code: str) -> Path:
"""Canonical path for a project workbook.
Example: ``~/projects/sheets/3DX10/3DX10.ods``
"""
return get_sheets_dir() / project_code / f"{project_code}.ods"
def ensure_project_dir(project_code: str) -> Path:
"""Create the project sheet directory if needed and return its path."""
d = get_sheets_dir() / project_code
d.mkdir(parents=True, exist_ok=True)
return d
def project_sheet_exists(project_code: str) -> bool:
"""Check whether a project workbook already exists locally."""
return get_project_sheet_path(project_code).is_file()
def save_project_sheet(project_code: str, ods_bytes: bytes) -> Path:
"""Write ODS bytes to the canonical project path.
Returns the Path written to.
"""
ensure_project_dir(project_code)
path = get_project_sheet_path(project_code)
with open(path, "wb") as f:
f.write(ods_bytes)
return path
def read_project_sheet(project_code: str) -> Optional[bytes]:
"""Read ODS bytes from the canonical project path, or None."""
path = get_project_sheet_path(project_code)
if not path.is_file():
return None
with open(path, "rb") as f:
return f.read()
def list_project_sheets() -> list:
"""Return a list of (project_code, path) tuples for all local sheets."""
sheets_dir = get_sheets_dir()
results = []
if not sheets_dir.is_dir():
return results
for entry in sorted(sheets_dir.iterdir()):
if entry.is_dir():
ods = entry / f"{entry.name}.ods"
if ods.is_file():
results.append((entry.name, ods))
return results

View File

@@ -1,542 +0,0 @@
"""Pull commands -- populate LibreOffice Calc sheets from Silo API data.
This module handles the UNO cell-level work for SiloPullBOM and
SiloPullProject. It fetches data via the SiloClient, then writes
cells with proper formatting, formulas, hidden columns, and row
hash tracking.
"""
from typing import Any, Dict, List, Optional
from . import sheet_format as sf
from . import sync_engine
from .client import SiloClient
# UNO imports -- only available inside LibreOffice
try:
import uno
from com.sun.star.beans import PropertyValue
from com.sun.star.table import CellHoriJustify
_HAS_UNO = True
except ImportError:
_HAS_UNO = False
# ---------------------------------------------------------------------------
# Colour helpers (UNO uses 0xRRGGBB integers)
# ---------------------------------------------------------------------------
def _rgb_int(r: int, g: int, b: int) -> int:
return (r << 16) | (g << 8) | b
_HEADER_BG = _rgb_int(68, 114, 196) # steel blue
_HEADER_FG = _rgb_int(255, 255, 255) # white text
_STATUS_COLORS = {k: _rgb_int(*v) for k, v in sf.STATUS_COLORS.items()}
# ---------------------------------------------------------------------------
# Cell writing helpers
# ---------------------------------------------------------------------------
def _set_cell_string(sheet, col: int, row: int, value: str):
cell = sheet.getCellByPosition(col, row)
cell.setString(str(value) if value else "")
def _set_cell_float(sheet, col: int, row: int, value, fmt: str = ""):
cell = sheet.getCellByPosition(col, row)
try:
cell.setValue(float(value))
except (ValueError, TypeError):
cell.setString(str(value) if value else "")
def _set_cell_formula(sheet, col: int, row: int, formula: str):
cell = sheet.getCellByPosition(col, row)
cell.setFormula(formula)
def _set_row_bg(sheet, row: int, col_count: int, color: int):
"""Set background colour on an entire row."""
rng = sheet.getCellRangeByPosition(0, row, col_count - 1, row)
rng.CellBackColor = color
def _format_header_row(sheet, col_count: int):
"""Bold white text on blue background for row 0."""
rng = sheet.getCellRangeByPosition(0, 0, col_count - 1, 0)
rng.CellBackColor = _HEADER_BG
rng.CharColor = _HEADER_FG
rng.CharWeight = 150 # com.sun.star.awt.FontWeight.BOLD
def _freeze_row(doc, row: int = 1):
"""Freeze panes at the given row (default: freeze header)."""
ctrl = doc.getCurrentController()
ctrl.freezeAtPosition(0, row)
def _hide_columns(sheet, start_col: int, end_col: int):
"""Hide a range of columns (inclusive)."""
cols = sheet.getColumns()
for i in range(start_col, end_col):
col = cols.getByIndex(i)
col.IsVisible = False
def _set_column_width(sheet, col: int, width_mm100: int):
"""Set column width in 1/100 mm."""
cols = sheet.getColumns()
c = cols.getByIndex(col)
c.Width = width_mm100
# ---------------------------------------------------------------------------
# BOM data helpers
# ---------------------------------------------------------------------------
def _get_meta(entry: Dict, key: str, default: str = "") -> str:
"""Extract a value from a BOM entry's metadata dict."""
meta = entry.get("metadata") or {}
val = meta.get(key, default)
return str(val) if val else default
def _get_meta_float(entry: Dict, key: str) -> Optional[float]:
meta = entry.get("metadata") or {}
val = meta.get(key)
if val is not None:
try:
return float(val)
except (ValueError, TypeError):
pass
return None
def _get_property(rev: Optional[Dict], key: str) -> str:
"""Extract a property from a revision's properties dict."""
if not rev:
return ""
props = rev.get("properties") or {}
val = props.get(key, "")
return str(val) if val else ""
# ---------------------------------------------------------------------------
# SiloPullBOM
# ---------------------------------------------------------------------------
def pull_bom(
client: SiloClient,
doc,
sheet,
assembly_pn: str,
project_code: str = "",
schema: str = "kindred-rd",
):
"""Fetch an expanded BOM and populate *sheet* with formatted data.
Parameters
----------
client : SiloClient
doc : XSpreadsheetDocument
sheet : XSpreadsheet (the target sheet to populate)
assembly_pn : str (top-level assembly part number)
project_code : str (project code for auto-tagging, optional)
schema : str
"""
if not _HAS_UNO:
raise RuntimeError("UNO API not available -- must run inside LibreOffice")
# Fetch expanded BOM
bom_entries = client.get_bom_expanded(assembly_pn, depth=10)
if not bom_entries:
raise RuntimeError(f"No BOM entries found for {assembly_pn}")
# Fetch the top-level item for the assembly name
try:
top_item = client.get_item(assembly_pn)
except RuntimeError:
top_item = {}
# Build a cache of items and their latest revisions for property lookup
item_cache: Dict[str, Dict] = {}
rev_cache: Dict[str, Dict] = {}
def _ensure_cached(pn: str):
if pn in item_cache:
return
try:
item_cache[pn] = client.get_item(pn)
except RuntimeError:
item_cache[pn] = {}
try:
revisions = client.get_revisions(pn)
if revisions:
rev_cache[pn] = revisions[0] # newest first
except RuntimeError:
pass
# Pre-cache all items in the BOM
all_pns = set()
for e in bom_entries:
all_pns.add(e.get("child_part_number", ""))
all_pns.add(e.get("parent_part_number", ""))
all_pns.discard("")
for pn in all_pns:
_ensure_cached(pn)
# -- Write header row ---------------------------------------------------
for col_idx, header in enumerate(sf.BOM_ALL_HEADERS):
_set_cell_string(sheet, col_idx, 0, header)
_format_header_row(sheet, sf.BOM_TOTAL_COLS)
# -- Group entries by parent for section headers ------------------------
# BOM entries come back in tree order (parent then children).
# We insert section header rows for each depth-1 sub-assembly.
row = 1 # current write row (0 is header)
prev_parent = None
for entry in bom_entries:
depth = entry.get("depth", 0)
child_pn = entry.get("child_part_number", "")
parent_pn = entry.get("parent_part_number", "")
child_item = item_cache.get(child_pn, {})
child_rev = rev_cache.get(child_pn)
# Section header: when the parent changes for depth >= 1 entries
if depth == 1 and parent_pn != prev_parent and parent_pn:
if row > 1:
# Blank separator row
row += 1
# Sub-assembly label row
parent_item = item_cache.get(parent_pn, {})
label = parent_item.get("description", parent_pn)
_set_cell_string(sheet, sf.COL_ITEM, row, label)
_set_cell_float(sheet, sf.COL_LEVEL, row, 0)
_set_cell_string(sheet, sf.COL_SOURCE, row, "M")
_set_cell_string(sheet, sf.COL_PN, row, parent_pn)
# Compute sub-assembly cost from children if available
parent_cost = _compute_subassembly_cost(bom_entries, parent_pn, item_cache)
if parent_cost is not None:
_set_cell_float(sheet, sf.COL_UNIT_COST, row, parent_cost)
_set_cell_float(sheet, sf.COL_QTY, row, 1)
# Ext Cost formula
ext_formula = f"={sf.col_letter(sf.COL_UNIT_COST)}{row + 1}*{sf.col_letter(sf.COL_QTY)}{row + 1}"
_set_cell_formula(sheet, sf.COL_EXT_COST, row, ext_formula)
_set_cell_string(sheet, sf.COL_SCHEMA, row, schema)
# Sync tracking for parent row
parent_cells = [""] * sf.BOM_TOTAL_COLS
parent_cells[sf.COL_ITEM] = label
parent_cells[sf.COL_LEVEL] = "0"
parent_cells[sf.COL_SOURCE] = "M"
parent_cells[sf.COL_PN] = parent_pn
parent_cells[sf.COL_SCHEMA] = schema
sync_engine.update_row_sync_state(
parent_cells,
sync_engine.STATUS_SYNCED,
updated_at=parent_item.get("updated_at", ""),
parent_pn="",
)
_set_cell_string(sheet, sf.COL_ROW_HASH, row, parent_cells[sf.COL_ROW_HASH])
_set_cell_string(
sheet, sf.COL_ROW_STATUS, row, parent_cells[sf.COL_ROW_STATUS]
)
_set_cell_string(
sheet, sf.COL_UPDATED_AT, row, parent_cells[sf.COL_UPDATED_AT]
)
_set_row_bg(sheet, row, sf.BOM_TOTAL_COLS, _STATUS_COLORS["synced"])
prev_parent = parent_pn
row += 1
# -- Write child row -----------------------------------------------
quantity = entry.get("quantity")
unit_cost = _get_meta_float(entry, "unit_cost")
if unit_cost is None:
unit_cost = child_item.get("standard_cost")
# Item column: blank for children (name is in the section header)
_set_cell_string(sheet, sf.COL_ITEM, row, "")
_set_cell_float(sheet, sf.COL_LEVEL, row, depth)
_set_cell_string(sheet, sf.COL_SOURCE, row, child_item.get("sourcing_type", ""))
_set_cell_string(sheet, sf.COL_PN, row, child_pn)
_set_cell_string(
sheet, sf.COL_DESCRIPTION, row, child_item.get("description", "")
)
_set_cell_string(
sheet, sf.COL_SELLER_DESC, row, _get_meta(entry, "seller_description")
)
if unit_cost is not None:
_set_cell_float(sheet, sf.COL_UNIT_COST, row, unit_cost)
if quantity is not None:
_set_cell_float(sheet, sf.COL_QTY, row, quantity)
# Ext Cost formula
ext_formula = f"={sf.col_letter(sf.COL_UNIT_COST)}{row + 1}*{sf.col_letter(sf.COL_QTY)}{row + 1}"
_set_cell_formula(sheet, sf.COL_EXT_COST, row, ext_formula)
_set_cell_string(
sheet, sf.COL_SOURCING_LINK, row, child_item.get("sourcing_link", "")
)
_set_cell_string(sheet, sf.COL_SCHEMA, row, schema)
# -- Property columns -----------------------------------------------
prop_values = _build_property_cells(child_item, child_rev, entry)
for i, val in enumerate(prop_values):
if val:
_set_cell_string(sheet, sf.COL_PROP_START + i, row, val)
# -- Sync tracking ---------------------------------------------------
row_cells = [""] * sf.BOM_TOTAL_COLS
row_cells[sf.COL_LEVEL] = str(depth)
row_cells[sf.COL_SOURCE] = child_item.get("sourcing_type", "")
row_cells[sf.COL_PN] = child_pn
row_cells[sf.COL_DESCRIPTION] = child_item.get("description", "")
row_cells[sf.COL_SELLER_DESC] = _get_meta(entry, "seller_description")
row_cells[sf.COL_UNIT_COST] = str(unit_cost) if unit_cost else ""
row_cells[sf.COL_QTY] = str(quantity) if quantity else ""
row_cells[sf.COL_SOURCING_LINK] = child_item.get("sourcing_link", "")
row_cells[sf.COL_SCHEMA] = schema
for i, val in enumerate(prop_values):
row_cells[sf.COL_PROP_START + i] = val
sync_engine.update_row_sync_state(
row_cells,
sync_engine.STATUS_SYNCED,
updated_at=child_item.get("updated_at", ""),
parent_pn=parent_pn,
)
_set_cell_string(sheet, sf.COL_ROW_HASH, row, row_cells[sf.COL_ROW_HASH])
_set_cell_string(sheet, sf.COL_ROW_STATUS, row, row_cells[sf.COL_ROW_STATUS])
_set_cell_string(sheet, sf.COL_UPDATED_AT, row, row_cells[sf.COL_UPDATED_AT])
_set_cell_string(sheet, sf.COL_PARENT_PN, row, row_cells[sf.COL_PARENT_PN])
_set_row_bg(sheet, row, sf.BOM_TOTAL_COLS, _STATUS_COLORS["synced"])
row += 1
# -- Formatting ---------------------------------------------------------
_freeze_row(doc, 1)
_hide_columns(sheet, sf.COL_PROP_START, sf.COL_PROP_END) # property cols
_hide_columns(sheet, sf.COL_SYNC_START, sf.BOM_TOTAL_COLS) # sync cols
# Set reasonable column widths for visible columns (in 1/100 mm)
_WIDTHS = {
sf.COL_ITEM: 4500,
sf.COL_LEVEL: 1200,
sf.COL_SOURCE: 1500,
sf.COL_PN: 2500,
sf.COL_DESCRIPTION: 5000,
sf.COL_SELLER_DESC: 6000,
sf.COL_UNIT_COST: 2200,
sf.COL_QTY: 1200,
sf.COL_EXT_COST: 2200,
sf.COL_SOURCING_LINK: 5000,
sf.COL_SCHEMA: 1500,
}
for col, width in _WIDTHS.items():
_set_column_width(sheet, col, width)
# Auto-tag all items with the project (if a project code is set)
if project_code:
_auto_tag_project(client, all_pns, project_code)
return row - 1 # number of data rows written
def _compute_subassembly_cost(
bom_entries: List[Dict],
parent_pn: str,
item_cache: Dict[str, Dict],
) -> Optional[float]:
"""Sum unit_cost * quantity for direct children of parent_pn."""
total = 0.0
found = False
for e in bom_entries:
if e.get("parent_part_number") == parent_pn and e.get("depth", 0) > 0:
q = e.get("quantity") or 0
uc = _get_meta_float(e, "unit_cost")
if uc is None:
child = item_cache.get(e.get("child_part_number", ""), {})
uc = child.get("standard_cost")
if uc is not None:
total += float(uc) * float(q)
found = True
return total if found else None
def _build_property_cells(
item: Dict, rev: Optional[Dict], bom_entry: Dict
) -> List[str]:
"""Build the property column values in order matching BOM_PROPERTY_HEADERS.
Sources (priority): revision properties > BOM metadata > item fields.
"""
result = []
for header in sf.BOM_PROPERTY_HEADERS:
db_key = sf.PROPERTY_KEY_MAP.get(header, "")
val = ""
# Check revision properties first
if db_key:
val = _get_property(rev, db_key)
# Fallback to BOM entry metadata
if not val and db_key:
val = _get_meta(bom_entry, db_key)
# Special case: Long Description from item field
if header == "Long Description" and not val:
val = item.get("long_description", "")
# Special case: Notes from item metadata or revision
if header == "Notes" and not val:
val = _get_meta(bom_entry, "notes")
result.append(str(val) if val else "")
return result
def _auto_tag_project(
client: SiloClient,
part_numbers: set,
project_code: str,
):
"""Tag all part numbers with the given project code (skip failures)."""
for pn in part_numbers:
if not pn:
continue
try:
existing = client.get_item_projects(pn)
existing_codes = (
{p.get("code", "") for p in existing}
if isinstance(existing, list)
else set()
)
if project_code not in existing_codes:
client.add_item_projects(pn, [project_code])
except RuntimeError:
pass # Best-effort tagging
# ---------------------------------------------------------------------------
# SiloPullProject
# ---------------------------------------------------------------------------
def pull_project(
client: SiloClient,
doc,
project_code: str,
schema: str = "kindred-rd",
):
"""Fetch project items and populate an Items sheet.
Also attempts to find an assembly and populate a BOM sheet.
"""
if not _HAS_UNO:
raise RuntimeError("UNO API not available")
items = client.get_project_items(project_code)
if not items:
raise RuntimeError(f"No items found for project {project_code}")
sheets = doc.getSheets()
# -- Items sheet --------------------------------------------------------
if sheets.hasByName("Items"):
items_sheet = sheets.getByName("Items")
else:
sheets.insertNewByName("Items", sheets.getCount())
items_sheet = sheets.getByName("Items")
# Header
for col_idx, header in enumerate(sf.ITEMS_HEADERS):
_set_cell_string(items_sheet, col_idx, 0, header)
header_range = items_sheet.getCellRangeByPosition(
0, 0, len(sf.ITEMS_HEADERS) - 1, 0
)
header_range.CellBackColor = _HEADER_BG
header_range.CharColor = _HEADER_FG
header_range.CharWeight = 150
for row_idx, item in enumerate(items, start=1):
_set_cell_string(items_sheet, 0, row_idx, item.get("part_number", ""))
_set_cell_string(items_sheet, 1, row_idx, item.get("description", ""))
_set_cell_string(items_sheet, 2, row_idx, item.get("item_type", ""))
_set_cell_string(items_sheet, 3, row_idx, item.get("sourcing_type", ""))
_set_cell_string(items_sheet, 4, row_idx, schema)
cost = item.get("standard_cost")
if cost is not None:
_set_cell_float(items_sheet, 5, row_idx, cost)
_set_cell_string(items_sheet, 6, row_idx, item.get("sourcing_link", ""))
_set_cell_string(items_sheet, 7, row_idx, item.get("long_description", ""))
# Properties from latest revision (if available)
rev = None
try:
revisions = client.get_revisions(item.get("part_number", ""))
if revisions:
rev = revisions[0]
except RuntimeError:
pass
prop_cols = [
"manufacturer",
"manufacturer_pn",
"supplier",
"supplier_pn",
"lead_time_days",
"minimum_order_qty",
"lifecycle_status",
"rohs_compliant",
"country_of_origin",
"material",
"finish",
"notes",
]
for pi, prop_key in enumerate(prop_cols):
val = _get_property(rev, prop_key)
if val:
_set_cell_string(items_sheet, 8 + pi, row_idx, val)
_set_cell_string(
items_sheet,
20,
row_idx,
item.get("created_at", "")[:10] if item.get("created_at") else "",
)
_set_cell_string(
items_sheet,
21,
row_idx,
item.get("updated_at", "")[:10] if item.get("updated_at") else "",
)
# Freeze header
_freeze_row(doc, 1)
# -- BOM sheet (if we can find an assembly) -----------------------------
assemblies = [i for i in items if i.get("item_type") == "assembly"]
if assemblies:
top_assembly = assemblies[0]
top_pn = top_assembly.get("part_number", "")
if sheets.hasByName("BOM"):
bom_sheet = sheets.getByName("BOM")
else:
sheets.insertNewByName("BOM", 0)
bom_sheet = sheets.getByName("BOM")
try:
pull_bom(
client, doc, bom_sheet, top_pn, project_code=project_code, schema=schema
)
except RuntimeError:
pass # BOM sheet stays empty if fetch fails
return len(items)

View File

@@ -1,431 +0,0 @@
"""Push command -- sync local BOM edits back to the Silo database.
Handles:
- Row classification (new / modified / synced / conflict)
- Creating new items via the API
- Updating existing items and BOM entry metadata
- Auto-tagging new items with the project code
- Conflict detection against server timestamps
- Updating row sync state after successful push
"""
from typing import Any, Dict, List, Optional, Tuple
from . import sheet_format as sf
from . import sync_engine
from .client import SiloClient
# UNO imports
try:
import uno
_HAS_UNO = True
except ImportError:
_HAS_UNO = False
def _read_sheet_rows(sheet) -> List[List[str]]:
"""Read all rows from a sheet as lists of strings."""
cursor = sheet.createCursor()
cursor.gotoStartOfUsedArea(False)
cursor.gotoEndOfUsedArea(True)
addr = cursor.getRangeAddress()
end_row = addr.EndRow
end_col = max(addr.EndColumn, sf.BOM_TOTAL_COLS - 1)
rows = []
for r in range(end_row + 1):
row_cells = []
for c in range(end_col + 1):
cell = sheet.getCellByPosition(c, r)
# Get display string for all cell types
val = cell.getString()
row_cells.append(val)
# Pad to full width
while len(row_cells) < sf.BOM_TOTAL_COLS:
row_cells.append("")
rows.append(row_cells)
return rows
def _detect_project_code(doc) -> str:
"""Try to detect the project code from the file path."""
try:
file_url = doc.getURL()
if file_url:
file_path = uno.fileUrlToSystemPath(file_url)
parts = file_path.replace("\\", "/").split("/")
if "sheets" in parts:
idx = parts.index("sheets")
if idx + 1 < len(parts):
return parts[idx + 1]
except Exception:
pass
return ""
def _fetch_server_timestamps(
client: SiloClient, part_numbers: List[str]
) -> Dict[str, str]:
"""Fetch updated_at timestamps for a list of part numbers."""
timestamps = {}
for pn in part_numbers:
if not pn:
continue
try:
item = client.get_item(pn)
timestamps[pn] = item.get("updated_at", "")
except RuntimeError:
pass
return timestamps
# ---------------------------------------------------------------------------
# Push execution
# ---------------------------------------------------------------------------
def push_sheet(
client: SiloClient,
doc,
sheet,
schema: str = "kindred-rd",
) -> Dict[str, Any]:
"""Execute a push for the active BOM sheet.
Returns a summary dict with counts and any errors.
"""
if not _HAS_UNO:
raise RuntimeError("UNO API not available")
rows = _read_sheet_rows(sheet)
if not rows:
return {"created": 0, "updated": 0, "errors": [], "skipped": 0}
project_code = _detect_project_code(doc)
# Classify all rows
classified = sync_engine.classify_rows(rows)
# Collect part numbers for server timestamp check
modified_pns = [
cells[sf.COL_PN].strip()
for _, status, cells in classified
if status == sync_engine.STATUS_MODIFIED and cells[sf.COL_PN].strip()
]
server_ts = _fetch_server_timestamps(client, modified_pns)
# Build diff
diff = sync_engine.build_push_diff(classified, server_timestamps=server_ts)
results = {
"created": 0,
"updated": 0,
"errors": [],
"skipped": diff["unchanged"],
"conflicts": len(diff["conflicts"]),
}
# -- Handle new rows: create items in the database ----------------------
for row_info in diff["new"]:
row_idx = row_info["row_index"]
cells = rows[row_idx]
pn = cells[sf.COL_PN].strip()
desc = cells[sf.COL_DESCRIPTION].strip()
source = cells[sf.COL_SOURCE].strip()
sourcing_link = cells[sf.COL_SOURCING_LINK].strip()
unit_cost_str = cells[sf.COL_UNIT_COST].strip()
qty_str = cells[sf.COL_QTY].strip()
parent_pn = (
cells[sf.COL_PARENT_PN].strip() if len(cells) > sf.COL_PARENT_PN else ""
)
unit_cost = None
if unit_cost_str:
try:
unit_cost = float(unit_cost_str.replace("$", "").replace(",", ""))
except ValueError:
pass
qty = 1.0
if qty_str:
try:
qty = float(qty_str)
except ValueError:
pass
if not desc:
results["errors"].append(
f"Row {row_idx + 1}: description is required for new items"
)
_set_row_status(sheet, row_idx, sync_engine.STATUS_ERROR)
continue
try:
if pn:
# Check if item already exists
try:
existing = client.get_item(pn)
# Item exists -- just update BOM relationship if parent is known
if parent_pn:
_update_bom_relationship(
client, parent_pn, pn, qty, unit_cost, cells
)
results["updated"] += 1
_update_row_after_push(sheet, rows, row_idx, existing)
continue
except RuntimeError:
pass # Item doesn't exist, create it
# Detect category from PN prefix (e.g., F01-0001 -> F01)
category = pn[:3] if pn and len(pn) >= 3 else ""
# Create the item
create_data = {
"schema": schema,
"category": category,
"description": desc,
}
if source:
create_data["sourcing_type"] = source
if sourcing_link:
create_data["sourcing_link"] = sourcing_link
if unit_cost is not None:
create_data["standard_cost"] = unit_cost
if project_code:
create_data["projects"] = [project_code]
created = client.create_item(**create_data)
created_pn = created.get("part_number", pn)
# Update the PN cell if it was auto-generated
if not pn and created_pn:
from . import pull as _pull
_pull._set_cell_string(sheet, sf.COL_PN, row_idx, created_pn)
cells[sf.COL_PN] = created_pn
# Add to parent's BOM if parent is known
if parent_pn:
_update_bom_relationship(
client, parent_pn, created_pn, qty, unit_cost, cells
)
# Auto-tag with project
if project_code:
try:
client.add_item_projects(created_pn, [project_code])
except RuntimeError:
pass
# Set property columns via revision update (if any properties set)
_push_properties(client, created_pn, cells)
results["created"] += 1
_update_row_after_push(sheet, rows, row_idx, created)
except RuntimeError as e:
results["errors"].append(f"Row {row_idx + 1} ({pn}): {e}")
_set_row_status(sheet, row_idx, sync_engine.STATUS_ERROR)
# -- Handle modified rows: update items ---------------------------------
for row_info in diff["modified"]:
row_idx = row_info["row_index"]
cells = rows[row_idx]
pn = cells[sf.COL_PN].strip()
parent_pn = (
cells[sf.COL_PARENT_PN].strip() if len(cells) > sf.COL_PARENT_PN else ""
)
if not pn:
results["errors"].append(
f"Row {row_idx + 1}: no part number for modified row"
)
continue
try:
# Update item fields
update_fields = {}
desc = cells[sf.COL_DESCRIPTION].strip()
if desc:
update_fields["description"] = desc
source = cells[sf.COL_SOURCE].strip()
if source:
update_fields["sourcing_type"] = source
sourcing_link = cells[sf.COL_SOURCING_LINK].strip()
update_fields["sourcing_link"] = sourcing_link
unit_cost_str = cells[sf.COL_UNIT_COST].strip()
unit_cost = None
if unit_cost_str:
try:
unit_cost = float(unit_cost_str.replace("$", "").replace(",", ""))
update_fields["standard_cost"] = unit_cost
except ValueError:
pass
if update_fields:
updated = client.update_item(pn, **update_fields)
else:
updated = client.get_item(pn)
# Update BOM relationship
qty_str = cells[sf.COL_QTY].strip()
qty = 1.0
if qty_str:
try:
qty = float(qty_str)
except ValueError:
pass
if parent_pn:
_update_bom_relationship(client, parent_pn, pn, qty, unit_cost, cells)
# Update properties
_push_properties(client, pn, cells)
# Auto-tag with project
if project_code:
try:
existing_projects = client.get_item_projects(pn)
existing_codes = (
{p.get("code", "") for p in existing_projects}
if isinstance(existing_projects, list)
else set()
)
if project_code not in existing_codes:
client.add_item_projects(pn, [project_code])
except RuntimeError:
pass
results["updated"] += 1
_update_row_after_push(sheet, rows, row_idx, updated)
except RuntimeError as e:
results["errors"].append(f"Row {row_idx + 1} ({pn}): {e}")
_set_row_status(sheet, row_idx, sync_engine.STATUS_ERROR)
# -- Mark conflicts -----------------------------------------------------
for row_info in diff["conflicts"]:
row_idx = row_info["row_index"]
_set_row_status(sheet, row_idx, sync_engine.STATUS_CONFLICT)
return results
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _update_bom_relationship(
client: SiloClient,
parent_pn: str,
child_pn: str,
qty: float,
unit_cost: Optional[float],
cells: List[str],
):
"""Create or update a BOM relationship between parent and child."""
metadata = {}
seller_desc = (
cells[sf.COL_SELLER_DESC].strip() if len(cells) > sf.COL_SELLER_DESC else ""
)
if seller_desc:
metadata["seller_description"] = seller_desc
if unit_cost is not None:
metadata["unit_cost"] = unit_cost
sourcing_link = (
cells[sf.COL_SOURCING_LINK].strip() if len(cells) > sf.COL_SOURCING_LINK else ""
)
if sourcing_link:
metadata["sourcing_link"] = sourcing_link
try:
# Try update first (entry may already exist)
client.update_bom_entry(
parent_pn,
child_pn,
quantity=qty,
metadata=metadata if metadata else None,
)
except RuntimeError:
# If update fails, try creating
try:
client.add_bom_entry(
parent_pn,
child_pn,
quantity=qty,
metadata=metadata if metadata else None,
)
except RuntimeError:
pass # Best effort
def _push_properties(client: SiloClient, pn: str, cells: List[str]):
"""Push property column values to the item's latest revision.
Currently this is best-effort -- the API may not support bulk property
updates in a single call. Properties are stored in revision.properties
JSONB on the server side.
"""
# Collect property values from the row
properties = {}
for i, header in enumerate(sf.BOM_PROPERTY_HEADERS):
col_idx = sf.COL_PROP_START + i
if col_idx < len(cells):
val = cells[col_idx].strip()
if val:
db_key = sf.PROPERTY_KEY_MAP.get(header, "")
if db_key:
properties[db_key] = val
if not properties:
return
# The Silo API stores properties on revisions. For now, we'll update
# the item's long_description if it's set, and rely on the revision
# properties being set during create or via revision update.
long_desc = properties.pop("long_description", None)
if long_desc:
try:
client.update_item(pn, long_description=long_desc)
except RuntimeError:
pass
def _update_row_after_push(
sheet, rows: List[List[str]], row_idx: int, item: Dict[str, Any]
):
"""Update sync tracking columns after a successful push."""
from . import pull as _pull
cells = rows[row_idx]
# Update the PN if the server returned one (auto-generated)
server_pn = item.get("part_number", "")
if server_pn and not cells[sf.COL_PN].strip():
cells[sf.COL_PN] = server_pn
_pull._set_cell_string(sheet, sf.COL_PN, row_idx, server_pn)
# Recompute hash and set synced status
sync_engine.update_row_sync_state(
cells,
sync_engine.STATUS_SYNCED,
updated_at=item.get("updated_at", ""),
)
_pull._set_cell_string(sheet, sf.COL_ROW_HASH, row_idx, cells[sf.COL_ROW_HASH])
_pull._set_cell_string(sheet, sf.COL_ROW_STATUS, row_idx, cells[sf.COL_ROW_STATUS])
_pull._set_cell_string(sheet, sf.COL_UPDATED_AT, row_idx, cells[sf.COL_UPDATED_AT])
_pull._set_row_bg(sheet, row_idx, sf.BOM_TOTAL_COLS, _pull._STATUS_COLORS["synced"])
def _set_row_status(sheet, row_idx: int, status: str):
"""Set just the status cell and row colour."""
from . import pull as _pull
_pull._set_cell_string(sheet, sf.COL_ROW_STATUS, row_idx, status)
color = _pull._STATUS_COLORS.get(status)
if color:
_pull._set_row_bg(sheet, row_idx, sf.BOM_TOTAL_COLS, color)

View File

@@ -1,94 +0,0 @@
"""Persistent settings for the Silo Calc extension.
Settings are stored in ``~/.config/silo/calc-settings.json``.
The file is a flat JSON dict with known keys.
"""
import json
import os
from pathlib import Path
from typing import Any, Dict
_SETTINGS_DIR = (
Path(os.environ.get("XDG_CONFIG_HOME", "~/.config")).expanduser() / "silo"
)
_SETTINGS_FILE = _SETTINGS_DIR / "calc-settings.json"
# Default values for every known key.
_DEFAULTS: Dict[str, Any] = {
"api_url": "",
"api_token": "",
"ssl_verify": True,
"ssl_cert_path": "",
"auth_username": "",
"auth_role": "",
"auth_source": "",
"projects_dir": "", # fallback: SILO_PROJECTS_DIR env or ~/projects
"default_schema": "kindred-rd",
"openrouter_api_key": "", # fallback: OPENROUTER_API_KEY env var
"openrouter_model": "", # fallback: ai_client.DEFAULT_MODEL
"openrouter_instructions": "", # fallback: ai_client.DEFAULT_INSTRUCTIONS
}
def load() -> Dict[str, Any]:
"""Load settings, returning defaults for any missing keys."""
cfg = dict(_DEFAULTS)
if _SETTINGS_FILE.is_file():
try:
with open(_SETTINGS_FILE, "r") as f:
stored = json.load(f)
cfg.update(stored)
except (json.JSONDecodeError, OSError):
pass
return cfg
def save(cfg: Dict[str, Any]) -> None:
"""Persist the full settings dict to disk."""
_SETTINGS_DIR.mkdir(parents=True, exist_ok=True)
with open(_SETTINGS_FILE, "w") as f:
json.dump(cfg, f, indent=2)
def get(key: str, default: Any = None) -> Any:
"""Convenience: load a single key."""
cfg = load()
return cfg.get(key, default)
def put(key: str, value: Any) -> None:
"""Convenience: update a single key and persist."""
cfg = load()
cfg[key] = value
save(cfg)
def save_auth(username: str, role: str = "", source: str = "", token: str = "") -> None:
"""Store authentication info."""
cfg = load()
cfg["auth_username"] = username
cfg["auth_role"] = role
cfg["auth_source"] = source
if token:
cfg["api_token"] = token
save(cfg)
def clear_auth() -> None:
"""Remove stored auth credentials."""
cfg = load()
cfg["api_token"] = ""
cfg["auth_username"] = ""
cfg["auth_role"] = ""
cfg["auth_source"] = ""
save(cfg)
def get_projects_dir() -> Path:
"""Return the resolved projects base directory."""
cfg = load()
d = cfg.get("projects_dir", "")
if not d:
d = os.environ.get("SILO_PROJECTS_DIR", "~/projects")
return Path(d).expanduser()

View File

@@ -1,178 +0,0 @@
"""BOM and Items sheet column layouts, constants, and detection helpers.
This module defines the column structure that matches the engineer's working
BOM format. Hidden property columns and sync-tracking columns are appended
to the right.
"""
from typing import Dict, List, Optional, Tuple
# ---------------------------------------------------------------------------
# Column indices -- BOM sheet
# ---------------------------------------------------------------------------
# Visible core columns (always shown)
BOM_VISIBLE_HEADERS: List[str] = [
"Item", # A - assembly label / section header
"Level", # B - depth in expanded BOM
"Source", # C - sourcing_type (M/P)
"PN", # D - part_number
"Description", # E - item description
"Seller Description", # F - metadata.seller_description
"Unit Cost", # G - standard_cost / metadata.unit_cost
"QTY", # H - quantity on relationship
"Ext Cost", # I - formula =G*H
"Sourcing Link", # J - sourcing_link
"Schema", # K - schema name
]
# Hidden property columns (collapsed group, available when needed)
BOM_PROPERTY_HEADERS: List[str] = [
"Manufacturer", # L
"Manufacturer PN", # M
"Supplier", # N
"Supplier PN", # O
"Lead Time (days)", # P
"Min Order Qty", # Q
"Lifecycle Status", # R
"RoHS Compliant", # S
"Country of Origin", # T
"Material", # U
"Finish", # V
"Notes", # W
"Long Description", # X
]
# Hidden sync columns (never shown to user)
BOM_SYNC_HEADERS: List[str] = [
"_silo_row_hash", # Y - SHA256 of row data at pull time
"_silo_row_status", # Z - synced/modified/new/error
"_silo_updated_at", # AA - server timestamp
"_silo_parent_pn", # AB - parent assembly PN for this BOM entry
]
# All headers in order
BOM_ALL_HEADERS: List[str] = (
BOM_VISIBLE_HEADERS + BOM_PROPERTY_HEADERS + BOM_SYNC_HEADERS
)
# Index constants for quick access
COL_ITEM = 0
COL_LEVEL = 1
COL_SOURCE = 2
COL_PN = 3
COL_DESCRIPTION = 4
COL_SELLER_DESC = 5
COL_UNIT_COST = 6
COL_QTY = 7
COL_EXT_COST = 8
COL_SOURCING_LINK = 9
COL_SCHEMA = 10
# Property column range
COL_PROP_START = len(BOM_VISIBLE_HEADERS) # 11
COL_PROP_END = COL_PROP_START + len(BOM_PROPERTY_HEADERS) # 24
# Sync column range
COL_SYNC_START = COL_PROP_END # 24
COL_ROW_HASH = COL_SYNC_START # 24
COL_ROW_STATUS = COL_SYNC_START + 1 # 25
COL_UPDATED_AT = COL_SYNC_START + 2 # 26
COL_PARENT_PN = COL_SYNC_START + 3 # 27
# Total column count
BOM_TOTAL_COLS = len(BOM_ALL_HEADERS)
# ---------------------------------------------------------------------------
# Items sheet columns (flat list of all items for a project)
# ---------------------------------------------------------------------------
ITEMS_HEADERS: List[str] = [
"PN",
"Description",
"Type",
"Source",
"Schema",
"Standard Cost",
"Sourcing Link",
"Long Description",
"Manufacturer",
"Manufacturer PN",
"Supplier",
"Supplier PN",
"Lead Time (days)",
"Min Order Qty",
"Lifecycle Status",
"RoHS Compliant",
"Country of Origin",
"Material",
"Finish",
"Notes",
"Created",
"Updated",
]
# ---------------------------------------------------------------------------
# Property key mapping (header name -> DB field path)
# ---------------------------------------------------------------------------
PROPERTY_KEY_MAP: Dict[str, str] = {
"Manufacturer": "manufacturer",
"Manufacturer PN": "manufacturer_pn",
"Supplier": "supplier",
"Supplier PN": "supplier_pn",
"Lead Time (days)": "lead_time_days",
"Min Order Qty": "minimum_order_qty",
"Lifecycle Status": "lifecycle_status",
"RoHS Compliant": "rohs_compliant",
"Country of Origin": "country_of_origin",
"Material": "material",
"Finish": "finish",
"Notes": "notes",
}
# Reverse map
DB_FIELD_TO_HEADER: Dict[str, str] = {v: k for k, v in PROPERTY_KEY_MAP.items()}
# ---------------------------------------------------------------------------
# Row status colours (RGB tuples, 0-255)
# ---------------------------------------------------------------------------
STATUS_COLORS: Dict[str, Tuple[int, int, int]] = {
"synced": (198, 239, 206), # light green #C6EFCE
"modified": (255, 235, 156), # light yellow #FFEB9C
"new": (189, 215, 238), # light blue #BDD7EE
"error": (255, 199, 206), # light red #FFC7CE
"conflict": (244, 176, 132), # orange #F4B084
}
# ---------------------------------------------------------------------------
# Sheet type detection
# ---------------------------------------------------------------------------
def detect_sheet_type(headers: List[str]) -> Optional[str]:
"""Detect sheet type from the first row of headers.
Returns ``"bom"``, ``"items"``, or ``None`` if unrecognised.
"""
if not headers:
return None
# Normalise for comparison
norm = [h.strip().lower() for h in headers]
if "item" in norm and "level" in norm and "qty" in norm:
return "bom"
if "pn" in norm and "type" in norm:
return "items"
return None
def col_letter(index: int) -> str:
"""Convert 0-based column index to spreadsheet letter (A, B, ..., AA, AB)."""
result = ""
while True:
result = chr(65 + index % 26) + result
index = index // 26 - 1
if index < 0:
break
return result

View File

@@ -1,160 +0,0 @@
"""Row hashing, diff classification, and sync state tracking.
Used by push/pull commands to detect which rows have been modified locally
since the last pull, and to detect conflicts with server-side changes.
"""
import hashlib
import json
from typing import Any, Dict, List, Optional, Tuple
from . import sheet_format as sf
# Row statuses
STATUS_SYNCED = "synced"
STATUS_MODIFIED = "modified"
STATUS_NEW = "new"
STATUS_ERROR = "error"
STATUS_CONFLICT = "conflict"
def compute_row_hash(cells: List[str]) -> str:
"""SHA-256 hash of the visible + property columns of a row.
Only the data columns are hashed (not sync tracking columns).
Blank/empty cells are normalised to the empty string.
"""
# Use columns 0..COL_PROP_END-1 (visible + properties, not sync cols)
data_cells = cells[: sf.COL_PROP_END]
# Normalise
normalised = [str(c).strip() if c else "" for c in data_cells]
raw = "\t".join(normalised).encode("utf-8")
return hashlib.sha256(raw).hexdigest()
def classify_row(cells: List[str]) -> str:
"""Return the sync status of a single row.
Reads the stored hash and current cell values to determine whether
the row is synced, modified, new, or in an error state.
"""
# Ensure we have enough columns
while len(cells) < sf.BOM_TOTAL_COLS:
cells.append("")
stored_hash = cells[sf.COL_ROW_HASH].strip() if cells[sf.COL_ROW_HASH] else ""
stored_status = cells[sf.COL_ROW_STATUS].strip() if cells[sf.COL_ROW_STATUS] else ""
# No hash -> new row (never pulled from server)
if not stored_hash:
# Check if there's any data in the row
has_data = any(
cells[i].strip()
for i in range(sf.COL_PROP_END)
if i < len(cells) and cells[i]
)
return STATUS_NEW if has_data else ""
# Compute current hash and compare
current_hash = compute_row_hash(cells)
if current_hash == stored_hash:
return STATUS_SYNCED
return STATUS_MODIFIED
def classify_rows(all_rows: List[List[str]]) -> List[Tuple[int, str, List[str]]]:
"""Classify every row in a sheet.
Returns list of ``(row_index, status, cells)`` for rows that have data.
Blank separator rows and the header row (index 0) are skipped.
"""
results = []
for i, cells in enumerate(all_rows):
if i == 0:
continue # header row
status = classify_row(list(cells))
if status:
results.append((i, status, list(cells)))
return results
def build_push_diff(
classified: List[Tuple[int, str, List[str]]],
server_timestamps: Optional[Dict[str, str]] = None,
) -> Dict[str, List[Dict[str, Any]]]:
"""Build a push diff from classified rows.
*server_timestamps* maps part numbers to their server ``updated_at``
values, used for conflict detection.
Returns a dict with keys ``new``, ``modified``, ``conflicts``, and
the count of ``unchanged`` rows.
"""
server_ts = server_timestamps or {}
new_rows = []
modified_rows = []
conflicts = []
unchanged = 0
for row_idx, status, cells in classified:
if status == STATUS_SYNCED:
unchanged += 1
continue
pn = cells[sf.COL_PN].strip() if len(cells) > sf.COL_PN else ""
stored_ts = (
cells[sf.COL_UPDATED_AT].strip()
if len(cells) > sf.COL_UPDATED_AT and cells[sf.COL_UPDATED_AT]
else ""
)
row_info = {
"row_index": row_idx,
"part_number": pn,
"description": cells[sf.COL_DESCRIPTION].strip()
if len(cells) > sf.COL_DESCRIPTION
else "",
"cells": cells[: sf.COL_PROP_END],
}
if status == STATUS_NEW:
new_rows.append(row_info)
elif status == STATUS_MODIFIED:
# Check for conflict: server changed since we pulled
server_updated = server_ts.get(pn, "")
if stored_ts and server_updated and server_updated != stored_ts:
row_info["local_ts"] = stored_ts
row_info["server_ts"] = server_updated
conflicts.append(row_info)
else:
modified_rows.append(row_info)
return {
"new": new_rows,
"modified": modified_rows,
"conflicts": conflicts,
"unchanged": unchanged,
}
def update_row_sync_state(
cells: List[str],
status: str,
updated_at: str = "",
parent_pn: str = "",
) -> List[str]:
"""Set the sync tracking columns on a row and return it.
Recomputes the row hash from current visible+property data.
"""
while len(cells) < sf.BOM_TOTAL_COLS:
cells.append("")
cells[sf.COL_ROW_HASH] = compute_row_hash(cells)
cells[sf.COL_ROW_STATUS] = status
if updated_at:
cells[sf.COL_UPDATED_AT] = updated_at
if parent_pn:
cells[sf.COL_PARENT_PN] = parent_pn
return cells

View File

@@ -1,496 +0,0 @@
"""UNO ProtocolHandler component for the Silo Calc extension.
This file is registered in META-INF/manifest.xml and acts as the entry
point for all toolbar / menu commands. Each custom protocol URL
dispatches to a handler function that orchestrates the corresponding
feature.
All silo_calc submodule imports are deferred to handler call time so
that the component registration always succeeds even if a submodule
has issues.
"""
import os
import sys
import traceback
import uno
import unohelper
from com.sun.star.frame import XDispatch, XDispatchProvider
from com.sun.star.lang import XInitialization, XServiceInfo
# Ensure pythonpath/ is importable
_ext_dir = os.path.dirname(os.path.abspath(__file__))
_pypath = os.path.join(_ext_dir, "pythonpath")
if _pypath not in sys.path:
sys.path.insert(0, _pypath)
# Service identifiers
_IMPL_NAME = "io.kindredsystems.silo.calc.Component"
_SERVICE_NAME = "com.sun.star.frame.ProtocolHandler"
_PROTOCOL = "io.kindredsystems.silo.calc:"
def _log(msg: str):
"""Print to the LibreOffice terminal / stderr."""
print(f"[Silo Calc] {msg}")
def _get_desktop():
ctx = uno.getComponentContext()
smgr = ctx.ServiceManager
return smgr.createInstanceWithContext("com.sun.star.frame.Desktop", ctx)
def _get_active_sheet():
"""Return (doc, sheet) for the current spreadsheet, or (None, None)."""
desktop = _get_desktop()
doc = desktop.getCurrentComponent()
if doc is None:
return None, None
if not doc.supportsService("com.sun.star.sheet.SpreadsheetDocument"):
return None, None
sheet = doc.getSheets().getByIndex(
doc.getCurrentController().getActiveSheet().getRangeAddress().Sheet
)
return doc, sheet
def _msgbox(title, message, box_type="infobox"):
"""Lightweight message box that doesn't depend on dialogs module."""
ctx = uno.getComponentContext()
smgr = ctx.ServiceManager
toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx)
parent = _get_desktop().getCurrentFrame().getContainerWindow()
mbt = uno.Enum(
"com.sun.star.awt.MessageBoxType",
"INFOBOX" if box_type == "infobox" else "ERRORBOX",
)
box = toolkit.createMessageBox(parent, mbt, 1, title, message)
box.execute()
# ---------------------------------------------------------------------------
# Command handlers -- imports are deferred to call time
# ---------------------------------------------------------------------------
def _cmd_login(frame):
from silo_calc import dialogs
dialogs.show_login_dialog()
def _cmd_settings(frame):
from silo_calc import dialogs
dialogs.show_settings_dialog()
def _cmd_pull_bom(frame):
"""Pull a BOM from the server and populate the active sheet."""
from silo_calc import dialogs, project_files
from silo_calc import pull as _pull
from silo_calc import settings as _settings
from silo_calc.client import SiloClient
client = SiloClient()
if not client.is_authenticated():
dialogs.show_login_dialog()
client = SiloClient() # reload after login
if not client.is_authenticated():
return
pn = dialogs.show_assembly_picker(client)
if not pn:
return
project_code = (
dialogs._input_box("Pull BOM", "Project code for auto-tagging (optional):")
or ""
)
doc, sheet = _get_active_sheet()
if doc is None:
desktop = _get_desktop()
doc = desktop.loadComponentFromURL("private:factory/scalc", "_blank", 0, ())
sheet = doc.getSheets().getByIndex(0)
sheet.setName("BOM")
try:
count = _pull.pull_bom(
client,
doc,
sheet,
pn,
project_code=project_code.strip(),
schema=_settings.get("default_schema", "kindred-rd"),
)
_log(f"Pulled BOM for {pn}: {count} rows")
if project_code.strip():
path = project_files.get_project_sheet_path(project_code.strip())
project_files.ensure_project_dir(project_code.strip())
url = uno.systemPathToFileUrl(str(path))
doc.storeToURL(url, ())
_log(f"Saved to {path}")
_msgbox("Pull BOM", f"Pulled {count} rows for {pn}.")
except RuntimeError as e:
_msgbox("Pull BOM Failed", str(e), box_type="errorbox")
def _cmd_pull_project(frame):
"""Pull all project items as a multi-sheet workbook."""
from silo_calc import dialogs, project_files
from silo_calc import pull as _pull
from silo_calc import settings as _settings
from silo_calc.client import SiloClient
client = SiloClient()
if not client.is_authenticated():
dialogs.show_login_dialog()
client = SiloClient()
if not client.is_authenticated():
return
code = dialogs.show_project_picker(client)
if not code:
return
doc, _ = _get_active_sheet()
if doc is None:
desktop = _get_desktop()
doc = desktop.loadComponentFromURL("private:factory/scalc", "_blank", 0, ())
try:
count = _pull.pull_project(
client,
doc,
code.strip(),
schema=_settings.get("default_schema", "kindred-rd"),
)
_log(f"Pulled project {code}: {count} items")
path = project_files.get_project_sheet_path(code.strip())
project_files.ensure_project_dir(code.strip())
url = uno.systemPathToFileUrl(str(path))
doc.storeToURL(url, ())
_log(f"Saved to {path}")
_msgbox("Pull Project", f"Pulled {count} items for project {code}.")
except RuntimeError as e:
_msgbox("Pull Project Failed", str(e), box_type="errorbox")
def _cmd_push(frame):
"""Push local changes back to the server."""
from silo_calc import dialogs, sync_engine
from silo_calc import push as _push
from silo_calc import settings as _settings
from silo_calc import sheet_format as sf
from silo_calc.client import SiloClient
client = SiloClient()
if not client.is_authenticated():
dialogs.show_login_dialog()
client = SiloClient()
if not client.is_authenticated():
return
doc, sheet = _get_active_sheet()
if doc is None or sheet is None:
_msgbox("Push", "No active spreadsheet.", box_type="errorbox")
return
rows = _push._read_sheet_rows(sheet)
classified = sync_engine.classify_rows(rows)
modified_pns = [
cells[sf.COL_PN].strip()
for _, status, cells in classified
if status == sync_engine.STATUS_MODIFIED and cells[sf.COL_PN].strip()
]
server_ts = _push._fetch_server_timestamps(client, modified_pns)
diff = sync_engine.build_push_diff(classified, server_timestamps=server_ts)
ok = dialogs.show_push_summary(
new_count=len(diff["new"]),
modified_count=len(diff["modified"]),
conflict_count=len(diff["conflicts"]),
unchanged_count=diff["unchanged"],
)
if not ok:
return
try:
results = _push.push_sheet(
client,
doc,
sheet,
schema=_settings.get("default_schema", "kindred-rd"),
)
except RuntimeError as e:
_msgbox("Push Failed", str(e), box_type="errorbox")
return
try:
file_url = doc.getURL()
if file_url:
doc.store()
except Exception:
pass
summary_lines = [
f"Created: {results['created']}",
f"Updated: {results['updated']}",
f"Conflicts: {results.get('conflicts', 0)}",
f"Skipped: {results['skipped']}",
]
if results["errors"]:
summary_lines.append(f"\nErrors ({len(results['errors'])}):")
for err in results["errors"][:10]:
summary_lines.append(f" - {err}")
if len(results["errors"]) > 10:
summary_lines.append(f" ... and {len(results['errors']) - 10} more")
_msgbox("Push Complete", "\n".join(summary_lines))
_log(f"Push complete: {results['created']} created, {results['updated']} updated")
def _cmd_add_item(frame):
"""Completion wizard for adding a new BOM row."""
from silo_calc import completion_wizard as _wizard
from silo_calc import settings as _settings
from silo_calc.client import SiloClient
client = SiloClient()
if not client.is_authenticated():
from silo_calc import dialogs
dialogs.show_login_dialog()
client = SiloClient()
if not client.is_authenticated():
return
doc, sheet = _get_active_sheet()
if doc is None or sheet is None:
_msgbox("Add Item", "No active spreadsheet.", box_type="errorbox")
return
project_code = ""
try:
file_url = doc.getURL()
if file_url:
file_path = uno.fileUrlToSystemPath(file_url)
parts = file_path.replace("\\", "/").split("/")
if "sheets" in parts:
idx = parts.index("sheets")
if idx + 1 < len(parts):
project_code = parts[idx + 1]
except Exception:
pass
cursor = sheet.createCursor()
cursor.gotoStartOfUsedArea(False)
cursor.gotoEndOfUsedArea(True)
insert_row = cursor.getRangeAddress().EndRow + 1
ok = _wizard.run_completion_wizard(
client,
doc,
sheet,
insert_row,
project_code=project_code,
schema=_settings.get("default_schema", "kindred-rd"),
)
if ok:
_log(f"Added new item at row {insert_row + 1}")
def _cmd_refresh(frame):
"""Re-pull the current sheet from server."""
_msgbox("Refresh", "Refresh -- coming soon.")
def _cmd_ai_description(frame):
"""Generate an AI description from the seller description in the current row."""
from silo_calc import ai_client as _ai
from silo_calc import dialogs
from silo_calc import pull as _pull
from silo_calc import sheet_format as sf
if not _ai.is_configured():
_msgbox(
"AI Describe",
"OpenRouter API key not configured.\n\n"
"Set it in Silo Settings or via the OPENROUTER_API_KEY environment variable.",
box_type="errorbox",
)
return
doc, sheet = _get_active_sheet()
if doc is None or sheet is None:
_msgbox("AI Describe", "No active spreadsheet.", box_type="errorbox")
return
controller = doc.getCurrentController()
selection = controller.getSelection()
try:
cell_addr = selection.getCellAddress()
row = cell_addr.Row
except AttributeError:
try:
range_addr = selection.getRangeAddress()
row = range_addr.StartRow
except AttributeError:
_msgbox("AI Describe", "Select a cell in a BOM row.", box_type="errorbox")
return
if row == 0:
_msgbox(
"AI Describe", "Select a data row, not the header.", box_type="errorbox"
)
return
seller_desc = sheet.getCellByPosition(sf.COL_SELLER_DESC, row).getString().strip()
if not seller_desc:
_msgbox(
"AI Describe",
f"No seller description in column F (row {row + 1}).",
box_type="errorbox",
)
return
existing_desc = sheet.getCellByPosition(sf.COL_DESCRIPTION, row).getString().strip()
part_number = sheet.getCellByPosition(sf.COL_PN, row).getString().strip()
category = part_number[:3] if len(part_number) >= 3 else ""
while True:
try:
ai_desc = _ai.generate_description(
seller_description=seller_desc,
category=category,
existing_description=existing_desc,
part_number=part_number,
)
except RuntimeError as e:
_msgbox("AI Describe Failed", str(e), box_type="errorbox")
return
accepted = dialogs.show_ai_description_dialog(seller_desc, ai_desc)
if accepted is not None:
_pull._set_cell_string(sheet, sf.COL_DESCRIPTION, row, accepted)
_log(f"AI description written to row {row + 1}: {accepted}")
return
retry = dialogs._input_box(
"AI Describe",
"Generate again? (yes/no):",
default="no",
)
if not retry or retry.strip().lower() not in ("yes", "y"):
return
# Command dispatch table
_COMMANDS = {
"SiloLogin": _cmd_login,
"SiloPullBOM": _cmd_pull_bom,
"SiloPullProject": _cmd_pull_project,
"SiloPush": _cmd_push,
"SiloAddItem": _cmd_add_item,
"SiloRefresh": _cmd_refresh,
"SiloSettings": _cmd_settings,
"SiloAIDescription": _cmd_ai_description,
}
# ---------------------------------------------------------------------------
# UNO Dispatch implementation
# ---------------------------------------------------------------------------
class SiloDispatch(unohelper.Base, XDispatch):
"""Handles a single dispatched command."""
def __init__(self, command: str, frame):
self._command = command
self._frame = frame
self._listeners = []
def dispatch(self, url, args):
handler = _COMMANDS.get(self._command)
if handler:
try:
handler(self._frame)
except Exception:
_log(f"Error in {self._command}:\n{traceback.format_exc()}")
try:
_msgbox(
f"Silo Error: {self._command}",
traceback.format_exc(),
box_type="errorbox",
)
except Exception:
pass
def addStatusListener(self, listener, url):
self._listeners.append(listener)
def removeStatusListener(self, listener, url):
if listener in self._listeners:
self._listeners.remove(listener)
class SiloDispatchProvider(
unohelper.Base, XDispatchProvider, XInitialization, XServiceInfo
):
"""ProtocolHandler component for Silo commands.
LibreOffice instantiates this via com.sun.star.frame.ProtocolHandler
and calls initialize() with the frame, then queryDispatch() for each
command URL matching our protocol.
"""
def __init__(self, ctx):
self._ctx = ctx
self._frame = None
# XInitialization -- called by framework with the Frame
def initialize(self, args):
if args:
self._frame = args[0]
# XDispatchProvider
def queryDispatch(self, url, target_frame_name, search_flags):
if url.Protocol == _PROTOCOL:
command = url.Path
if command in _COMMANDS:
return SiloDispatch(command, self._frame)
return None
def queryDispatches(self, requests):
return [
self.queryDispatch(r.FeatureURL, r.FrameName, r.SearchFlags)
for r in requests
]
# XServiceInfo
def getImplementationName(self):
return _IMPL_NAME
def supportsService(self, name):
return name == _SERVICE_NAME
def getSupportedServiceNames(self):
return (_SERVICE_NAME,)
# UNO component registration
g_ImplementationHelper = unohelper.ImplementationHelper()
g_ImplementationHelper.addImplementation(
SiloDispatchProvider,
_IMPL_NAME,
(_SERVICE_NAME,),
)

View File

@@ -1,345 +0,0 @@
"""Basic tests for silo_calc modules (no UNO dependency)."""
import hashlib
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
# Add pythonpath to sys.path so we can import without LibreOffice
_pkg_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
_pypath = os.path.join(_pkg_dir, "pythonpath")
if _pypath not in sys.path:
sys.path.insert(0, _pypath)
from silo_calc import project_files, sync_engine
from silo_calc import settings as _settings
from silo_calc import sheet_format as sf
class TestSheetFormat(unittest.TestCase):
def test_bom_header_counts(self):
self.assertEqual(len(sf.BOM_VISIBLE_HEADERS), 11)
self.assertEqual(len(sf.BOM_PROPERTY_HEADERS), 13)
self.assertEqual(len(sf.BOM_SYNC_HEADERS), 4)
self.assertEqual(sf.BOM_TOTAL_COLS, 28)
def test_column_indices(self):
self.assertEqual(sf.COL_ITEM, 0)
self.assertEqual(sf.COL_PN, 3)
self.assertEqual(sf.COL_UNIT_COST, 6)
self.assertEqual(sf.COL_QTY, 7)
self.assertEqual(sf.COL_EXT_COST, 8)
def test_detect_sheet_type_bom(self):
headers = [
"Item",
"Level",
"Source",
"PN",
"Description",
"Seller Description",
"Unit Cost",
"QTY",
"Ext Cost",
]
self.assertEqual(sf.detect_sheet_type(headers), "bom")
def test_detect_sheet_type_items(self):
headers = ["PN", "Description", "Type", "Source"]
self.assertEqual(sf.detect_sheet_type(headers), "items")
def test_detect_sheet_type_unknown(self):
self.assertIsNone(sf.detect_sheet_type([]))
self.assertIsNone(sf.detect_sheet_type(["Foo", "Bar"]))
def test_col_letter(self):
self.assertEqual(sf.col_letter(0), "A")
self.assertEqual(sf.col_letter(25), "Z")
self.assertEqual(sf.col_letter(26), "AA")
self.assertEqual(sf.col_letter(27), "AB")
def test_property_key_map_bidirectional(self):
for header, key in sf.PROPERTY_KEY_MAP.items():
self.assertEqual(sf.DB_FIELD_TO_HEADER[key], header)
class TestSyncEngine(unittest.TestCase):
def _make_row(self, pn="F01-0001", desc="Test", cost="10.00", qty="2"):
"""Create a minimal BOM row with enough columns."""
cells = [""] * sf.BOM_TOTAL_COLS
cells[sf.COL_PN] = pn
cells[sf.COL_DESCRIPTION] = desc
cells[sf.COL_UNIT_COST] = cost
cells[sf.COL_QTY] = qty
return cells
def test_compute_row_hash_deterministic(self):
row = self._make_row()
h1 = sync_engine.compute_row_hash(row)
h2 = sync_engine.compute_row_hash(row)
self.assertEqual(h1, h2)
self.assertEqual(len(h1), 64) # SHA-256 hex
def test_compute_row_hash_changes(self):
row1 = self._make_row(cost="10.00")
row2 = self._make_row(cost="20.00")
self.assertNotEqual(
sync_engine.compute_row_hash(row1),
sync_engine.compute_row_hash(row2),
)
def test_classify_row_new(self):
row = self._make_row()
# No stored hash -> new
self.assertEqual(sync_engine.classify_row(row), sync_engine.STATUS_NEW)
def test_classify_row_synced(self):
row = self._make_row()
# Set stored hash to current hash
row[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row)
row[sf.COL_ROW_STATUS] = "synced"
self.assertEqual(sync_engine.classify_row(row), sync_engine.STATUS_SYNCED)
def test_classify_row_modified(self):
row = self._make_row()
row[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row)
# Now change a cell
row[sf.COL_UNIT_COST] = "99.99"
self.assertEqual(sync_engine.classify_row(row), sync_engine.STATUS_MODIFIED)
def test_classify_rows_skips_header(self):
header = list(sf.BOM_ALL_HEADERS)
row1 = self._make_row()
all_rows = [header, row1]
classified = sync_engine.classify_rows(all_rows)
# Header row (index 0) should be skipped
self.assertEqual(len(classified), 1)
self.assertEqual(classified[0][0], 1) # row index
def test_update_row_sync_state(self):
row = self._make_row()
updated = sync_engine.update_row_sync_state(
row, "synced", updated_at="2025-01-01T00:00:00Z", parent_pn="A01-0003"
)
self.assertEqual(updated[sf.COL_ROW_STATUS], "synced")
self.assertEqual(updated[sf.COL_UPDATED_AT], "2025-01-01T00:00:00Z")
self.assertEqual(updated[sf.COL_PARENT_PN], "A01-0003")
# Hash should be set
self.assertEqual(len(updated[sf.COL_ROW_HASH]), 64)
def test_build_push_diff(self):
row_new = self._make_row(pn="NEW-0001")
row_synced = self._make_row(pn="F01-0001")
row_synced[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row_synced)
row_modified = self._make_row(pn="F01-0002")
row_modified[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row_modified)
row_modified[sf.COL_UNIT_COST] = "999.99" # change after hash
classified = [
(1, sync_engine.STATUS_NEW, row_new),
(2, sync_engine.STATUS_SYNCED, row_synced),
(3, sync_engine.STATUS_MODIFIED, row_modified),
]
diff = sync_engine.build_push_diff(classified)
self.assertEqual(len(diff["new"]), 1)
self.assertEqual(len(diff["modified"]), 1)
self.assertEqual(diff["unchanged"], 1)
self.assertEqual(len(diff["conflicts"]), 0)
def test_conflict_detection(self):
row = self._make_row(pn="F01-0001")
row[sf.COL_ROW_HASH] = sync_engine.compute_row_hash(row)
row[sf.COL_UPDATED_AT] = "2025-01-01T00:00:00Z"
row[sf.COL_UNIT_COST] = "changed" # local modification
classified = [(1, sync_engine.STATUS_MODIFIED, row)]
diff = sync_engine.build_push_diff(
classified,
server_timestamps={
"F01-0001": "2025-06-01T00:00:00Z"
}, # server changed too
)
self.assertEqual(len(diff["conflicts"]), 1)
self.assertEqual(len(diff["modified"]), 0)
class TestSettings(unittest.TestCase):
def test_load_defaults(self):
# Use a temp dir so we don't touch real settings
with tempfile.TemporaryDirectory() as tmp:
_settings._SETTINGS_FILE = Path(tmp) / "test-settings.json"
cfg = _settings.load()
self.assertEqual(cfg["ssl_verify"], True)
self.assertEqual(cfg["default_schema"], "kindred-rd")
def test_save_and_load(self):
with tempfile.TemporaryDirectory() as tmp:
_settings._SETTINGS_DIR = Path(tmp)
_settings._SETTINGS_FILE = Path(tmp) / "test-settings.json"
_settings.put("api_url", "https://silo.test/api")
cfg = _settings.load()
self.assertEqual(cfg["api_url"], "https://silo.test/api")
def test_save_auth_and_clear(self):
with tempfile.TemporaryDirectory() as tmp:
_settings._SETTINGS_DIR = Path(tmp)
_settings._SETTINGS_FILE = Path(tmp) / "test-settings.json"
_settings.save_auth("testuser", "editor", "local", "silo_abc123")
cfg = _settings.load()
self.assertEqual(cfg["auth_username"], "testuser")
self.assertEqual(cfg["api_token"], "silo_abc123")
_settings.clear_auth()
cfg = _settings.load()
self.assertEqual(cfg["api_token"], "")
self.assertEqual(cfg["auth_username"], "")
class TestProjectFiles(unittest.TestCase):
def test_get_project_sheet_path(self):
path = project_files.get_project_sheet_path("3DX10")
self.assertTrue(str(path).endswith("sheets/3DX10/3DX10.ods"))
def test_save_and_read(self):
with tempfile.TemporaryDirectory() as tmp:
_settings._SETTINGS_DIR = Path(tmp)
_settings._SETTINGS_FILE = Path(tmp) / "test-settings.json"
_settings.put("projects_dir", tmp)
test_data = b"PK\x03\x04fake-ods-content"
path = project_files.save_project_sheet("TEST", test_data)
self.assertTrue(path.is_file())
self.assertEqual(path.name, "TEST.ods")
read_back = project_files.read_project_sheet("TEST")
self.assertEqual(read_back, test_data)
def test_list_project_sheets(self):
with tempfile.TemporaryDirectory() as tmp:
_settings._SETTINGS_DIR = Path(tmp)
_settings._SETTINGS_FILE = Path(tmp) / "test-settings.json"
_settings.put("projects_dir", tmp)
# Create two project dirs
for code in ("AAA", "BBB"):
d = Path(tmp) / "sheets" / code
d.mkdir(parents=True)
(d / f"{code}.ods").write_bytes(b"fake")
sheets = project_files.list_project_sheets()
codes = [s[0] for s in sheets]
self.assertIn("AAA", codes)
self.assertIn("BBB", codes)
class TestAIClient(unittest.TestCase):
"""Test ai_client helpers that don't require network or UNO."""
def test_default_constants(self):
from silo_calc import ai_client
self.assertTrue(ai_client.OPENROUTER_API_URL.startswith("https://"))
self.assertTrue(len(ai_client.DEFAULT_MODEL) > 0)
self.assertTrue(len(ai_client.DEFAULT_INSTRUCTIONS) > 0)
def test_is_configured_false_by_default(self):
from silo_calc import ai_client
with tempfile.TemporaryDirectory() as tmp:
_settings._SETTINGS_DIR = Path(tmp)
_settings._SETTINGS_FILE = Path(tmp) / "test-settings.json"
old = os.environ.pop("OPENROUTER_API_KEY", None)
try:
self.assertFalse(ai_client.is_configured())
finally:
if old is not None:
os.environ["OPENROUTER_API_KEY"] = old
def test_is_configured_with_env_var(self):
from silo_calc import ai_client
with tempfile.TemporaryDirectory() as tmp:
_settings._SETTINGS_DIR = Path(tmp)
_settings._SETTINGS_FILE = Path(tmp) / "test-settings.json"
old = os.environ.get("OPENROUTER_API_KEY")
os.environ["OPENROUTER_API_KEY"] = "sk-test-key"
try:
self.assertTrue(ai_client.is_configured())
finally:
if old is not None:
os.environ["OPENROUTER_API_KEY"] = old
else:
os.environ.pop("OPENROUTER_API_KEY", None)
def test_is_configured_with_settings(self):
from silo_calc import ai_client
with tempfile.TemporaryDirectory() as tmp:
_settings._SETTINGS_DIR = Path(tmp)
_settings._SETTINGS_FILE = Path(tmp) / "test-settings.json"
_settings.put("openrouter_api_key", "sk-test-key")
old = os.environ.pop("OPENROUTER_API_KEY", None)
try:
self.assertTrue(ai_client.is_configured())
finally:
if old is not None:
os.environ["OPENROUTER_API_KEY"] = old
def test_chat_completion_missing_key_raises(self):
from silo_calc import ai_client
with tempfile.TemporaryDirectory() as tmp:
_settings._SETTINGS_DIR = Path(tmp)
_settings._SETTINGS_FILE = Path(tmp) / "test-settings.json"
old = os.environ.pop("OPENROUTER_API_KEY", None)
try:
with self.assertRaises(RuntimeError) as ctx:
ai_client.chat_completion([{"role": "user", "content": "test"}])
self.assertIn("not configured", str(ctx.exception))
finally:
if old is not None:
os.environ["OPENROUTER_API_KEY"] = old
def test_get_model_default(self):
from silo_calc import ai_client
with tempfile.TemporaryDirectory() as tmp:
_settings._SETTINGS_DIR = Path(tmp)
_settings._SETTINGS_FILE = Path(tmp) / "test-settings.json"
self.assertEqual(ai_client._get_model(), ai_client.DEFAULT_MODEL)
def test_get_model_from_settings(self):
from silo_calc import ai_client
with tempfile.TemporaryDirectory() as tmp:
_settings._SETTINGS_DIR = Path(tmp)
_settings._SETTINGS_FILE = Path(tmp) / "test-settings.json"
_settings.put("openrouter_model", "anthropic/claude-3-haiku")
self.assertEqual(ai_client._get_model(), "anthropic/claude-3-haiku")
def test_get_instructions_default(self):
from silo_calc import ai_client
with tempfile.TemporaryDirectory() as tmp:
_settings._SETTINGS_DIR = Path(tmp)
_settings._SETTINGS_FILE = Path(tmp) / "test-settings.json"
self.assertEqual(
ai_client._get_instructions(), ai_client.DEFAULT_INSTRUCTIONS
)
def test_get_instructions_from_settings(self):
from silo_calc import ai_client
with tempfile.TemporaryDirectory() as tmp:
_settings._SETTINGS_DIR = Path(tmp)
_settings._SETTINGS_FILE = Path(tmp) / "test-settings.json"
_settings.put("openrouter_instructions", "Custom instructions")
self.assertEqual(ai_client._get_instructions(), "Custom instructions")
if __name__ == "__main__":
unittest.main()