LibreOffice Calc extension for Silo PLM integration. Uses shared silo-client package (submodule) for API communication. Changes from monorepo version: - SiloClient class removed from client.py, replaced with CalcSiloSettings adapter + factory function wrapping silo_client.SiloClient - silo_calc_component.py adds silo-client to sys.path - Makefile build-oxt copies silo_client into .oxt for self-contained packaging - All other modules unchanged
502 lines
15 KiB
Python
502 lines
15 KiB
Python
"""UNO ProtocolHandler component for the Silo Calc extension.
|
|
|
|
This file is registered in META-INF/manifest.xml and acts as the entry
|
|
point for all toolbar / menu commands. Each custom protocol URL
|
|
dispatches to a handler function that orchestrates the corresponding
|
|
feature.
|
|
|
|
All silo_calc submodule imports are deferred to handler call time so
|
|
that the component registration always succeeds even if a submodule
|
|
has issues.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import traceback
|
|
|
|
import uno
|
|
import unohelper
|
|
from com.sun.star.frame import XDispatch, XDispatchProvider
|
|
from com.sun.star.lang import XInitialization, XServiceInfo
|
|
|
|
# Ensure pythonpath/ is importable
|
|
_ext_dir = os.path.dirname(os.path.abspath(__file__))
|
|
_pypath = os.path.join(_ext_dir, "pythonpath")
|
|
if _pypath not in sys.path:
|
|
sys.path.insert(0, _pypath)
|
|
|
|
# Ensure silo-client package is importable
|
|
_client_path = os.path.join(_ext_dir, "silo-client")
|
|
if _client_path not in sys.path:
|
|
sys.path.insert(0, _client_path)
|
|
|
|
# Service identifiers
|
|
_IMPL_NAME = "io.kindredsystems.silo.calc.Component"
|
|
_SERVICE_NAME = "com.sun.star.frame.ProtocolHandler"
|
|
_PROTOCOL = "io.kindredsystems.silo.calc:"
|
|
|
|
|
|
def _log(msg: str):
|
|
"""Print to the LibreOffice terminal / stderr."""
|
|
print(f"[Silo Calc] {msg}")
|
|
|
|
|
|
def _get_desktop():
|
|
ctx = uno.getComponentContext()
|
|
smgr = ctx.ServiceManager
|
|
return smgr.createInstanceWithContext("com.sun.star.frame.Desktop", ctx)
|
|
|
|
|
|
def _get_active_sheet():
|
|
"""Return (doc, sheet) for the current spreadsheet, or (None, None)."""
|
|
desktop = _get_desktop()
|
|
doc = desktop.getCurrentComponent()
|
|
if doc is None:
|
|
return None, None
|
|
if not doc.supportsService("com.sun.star.sheet.SpreadsheetDocument"):
|
|
return None, None
|
|
sheet = doc.getSheets().getByIndex(
|
|
doc.getCurrentController().getActiveSheet().getRangeAddress().Sheet
|
|
)
|
|
return doc, sheet
|
|
|
|
|
|
def _msgbox(title, message, box_type="infobox"):
|
|
"""Lightweight message box that doesn't depend on dialogs module."""
|
|
ctx = uno.getComponentContext()
|
|
smgr = ctx.ServiceManager
|
|
toolkit = smgr.createInstanceWithContext("com.sun.star.awt.Toolkit", ctx)
|
|
parent = _get_desktop().getCurrentFrame().getContainerWindow()
|
|
mbt = uno.Enum(
|
|
"com.sun.star.awt.MessageBoxType",
|
|
"INFOBOX" if box_type == "infobox" else "ERRORBOX",
|
|
)
|
|
box = toolkit.createMessageBox(parent, mbt, 1, title, message)
|
|
box.execute()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Command handlers -- imports are deferred to call time
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _cmd_login(frame):
|
|
from silo_calc import dialogs
|
|
|
|
dialogs.show_login_dialog()
|
|
|
|
|
|
def _cmd_settings(frame):
|
|
from silo_calc import dialogs
|
|
|
|
dialogs.show_settings_dialog()
|
|
|
|
|
|
def _cmd_pull_bom(frame):
|
|
"""Pull a BOM from the server and populate the active sheet."""
|
|
from silo_calc import dialogs, project_files
|
|
from silo_calc import pull as _pull
|
|
from silo_calc import settings as _settings
|
|
from silo_calc.client import SiloClient
|
|
|
|
client = SiloClient()
|
|
if not client.is_authenticated():
|
|
dialogs.show_login_dialog()
|
|
client = SiloClient() # reload after login
|
|
if not client.is_authenticated():
|
|
return
|
|
|
|
pn = dialogs.show_assembly_picker(client)
|
|
if not pn:
|
|
return
|
|
|
|
project_code = (
|
|
dialogs._input_box("Pull BOM", "Project code for auto-tagging (optional):")
|
|
or ""
|
|
)
|
|
|
|
doc, sheet = _get_active_sheet()
|
|
if doc is None:
|
|
desktop = _get_desktop()
|
|
doc = desktop.loadComponentFromURL("private:factory/scalc", "_blank", 0, ())
|
|
sheet = doc.getSheets().getByIndex(0)
|
|
sheet.setName("BOM")
|
|
|
|
try:
|
|
count = _pull.pull_bom(
|
|
client,
|
|
doc,
|
|
sheet,
|
|
pn,
|
|
project_code=project_code.strip(),
|
|
schema=_settings.get("default_schema", "kindred-rd"),
|
|
)
|
|
_log(f"Pulled BOM for {pn}: {count} rows")
|
|
|
|
if project_code.strip():
|
|
path = project_files.get_project_sheet_path(project_code.strip())
|
|
project_files.ensure_project_dir(project_code.strip())
|
|
url = uno.systemPathToFileUrl(str(path))
|
|
doc.storeToURL(url, ())
|
|
_log(f"Saved to {path}")
|
|
|
|
_msgbox("Pull BOM", f"Pulled {count} rows for {pn}.")
|
|
except RuntimeError as e:
|
|
_msgbox("Pull BOM Failed", str(e), box_type="errorbox")
|
|
|
|
|
|
def _cmd_pull_project(frame):
|
|
"""Pull all project items as a multi-sheet workbook."""
|
|
from silo_calc import dialogs, project_files
|
|
from silo_calc import pull as _pull
|
|
from silo_calc import settings as _settings
|
|
from silo_calc.client import SiloClient
|
|
|
|
client = SiloClient()
|
|
if not client.is_authenticated():
|
|
dialogs.show_login_dialog()
|
|
client = SiloClient()
|
|
if not client.is_authenticated():
|
|
return
|
|
|
|
code = dialogs.show_project_picker(client)
|
|
if not code:
|
|
return
|
|
|
|
doc, _ = _get_active_sheet()
|
|
if doc is None:
|
|
desktop = _get_desktop()
|
|
doc = desktop.loadComponentFromURL("private:factory/scalc", "_blank", 0, ())
|
|
|
|
try:
|
|
count = _pull.pull_project(
|
|
client,
|
|
doc,
|
|
code.strip(),
|
|
schema=_settings.get("default_schema", "kindred-rd"),
|
|
)
|
|
_log(f"Pulled project {code}: {count} items")
|
|
|
|
path = project_files.get_project_sheet_path(code.strip())
|
|
project_files.ensure_project_dir(code.strip())
|
|
url = uno.systemPathToFileUrl(str(path))
|
|
doc.storeToURL(url, ())
|
|
_log(f"Saved to {path}")
|
|
|
|
_msgbox("Pull Project", f"Pulled {count} items for project {code}.")
|
|
except RuntimeError as e:
|
|
_msgbox("Pull Project Failed", str(e), box_type="errorbox")
|
|
|
|
|
|
def _cmd_push(frame):
|
|
"""Push local changes back to the server."""
|
|
from silo_calc import dialogs, sync_engine
|
|
from silo_calc import push as _push
|
|
from silo_calc import settings as _settings
|
|
from silo_calc import sheet_format as sf
|
|
from silo_calc.client import SiloClient
|
|
|
|
client = SiloClient()
|
|
if not client.is_authenticated():
|
|
dialogs.show_login_dialog()
|
|
client = SiloClient()
|
|
if not client.is_authenticated():
|
|
return
|
|
|
|
doc, sheet = _get_active_sheet()
|
|
if doc is None or sheet is None:
|
|
_msgbox("Push", "No active spreadsheet.", box_type="errorbox")
|
|
return
|
|
|
|
rows = _push._read_sheet_rows(sheet)
|
|
classified = sync_engine.classify_rows(rows)
|
|
|
|
modified_pns = [
|
|
cells[sf.COL_PN].strip()
|
|
for _, status, cells in classified
|
|
if status == sync_engine.STATUS_MODIFIED and cells[sf.COL_PN].strip()
|
|
]
|
|
server_ts = _push._fetch_server_timestamps(client, modified_pns)
|
|
diff = sync_engine.build_push_diff(classified, server_timestamps=server_ts)
|
|
|
|
ok = dialogs.show_push_summary(
|
|
new_count=len(diff["new"]),
|
|
modified_count=len(diff["modified"]),
|
|
conflict_count=len(diff["conflicts"]),
|
|
unchanged_count=diff["unchanged"],
|
|
)
|
|
if not ok:
|
|
return
|
|
|
|
try:
|
|
results = _push.push_sheet(
|
|
client,
|
|
doc,
|
|
sheet,
|
|
schema=_settings.get("default_schema", "kindred-rd"),
|
|
)
|
|
except RuntimeError as e:
|
|
_msgbox("Push Failed", str(e), box_type="errorbox")
|
|
return
|
|
|
|
try:
|
|
file_url = doc.getURL()
|
|
if file_url:
|
|
doc.store()
|
|
except Exception:
|
|
pass
|
|
|
|
summary_lines = [
|
|
f"Created: {results['created']}",
|
|
f"Updated: {results['updated']}",
|
|
f"Conflicts: {results.get('conflicts', 0)}",
|
|
f"Skipped: {results['skipped']}",
|
|
]
|
|
if results["errors"]:
|
|
summary_lines.append(f"\nErrors ({len(results['errors'])}):")
|
|
for err in results["errors"][:10]:
|
|
summary_lines.append(f" - {err}")
|
|
if len(results["errors"]) > 10:
|
|
summary_lines.append(f" ... and {len(results['errors']) - 10} more")
|
|
|
|
_msgbox("Push Complete", "\n".join(summary_lines))
|
|
_log(f"Push complete: {results['created']} created, {results['updated']} updated")
|
|
|
|
|
|
def _cmd_add_item(frame):
|
|
"""Completion wizard for adding a new BOM row."""
|
|
from silo_calc import completion_wizard as _wizard
|
|
from silo_calc import settings as _settings
|
|
from silo_calc.client import SiloClient
|
|
|
|
client = SiloClient()
|
|
if not client.is_authenticated():
|
|
from silo_calc import dialogs
|
|
|
|
dialogs.show_login_dialog()
|
|
client = SiloClient()
|
|
if not client.is_authenticated():
|
|
return
|
|
|
|
doc, sheet = _get_active_sheet()
|
|
if doc is None or sheet is None:
|
|
_msgbox("Add Item", "No active spreadsheet.", box_type="errorbox")
|
|
return
|
|
|
|
project_code = ""
|
|
try:
|
|
file_url = doc.getURL()
|
|
if file_url:
|
|
file_path = uno.fileUrlToSystemPath(file_url)
|
|
parts = file_path.replace("\\", "/").split("/")
|
|
if "sheets" in parts:
|
|
idx = parts.index("sheets")
|
|
if idx + 1 < len(parts):
|
|
project_code = parts[idx + 1]
|
|
except Exception:
|
|
pass
|
|
|
|
cursor = sheet.createCursor()
|
|
cursor.gotoStartOfUsedArea(False)
|
|
cursor.gotoEndOfUsedArea(True)
|
|
insert_row = cursor.getRangeAddress().EndRow + 1
|
|
|
|
ok = _wizard.run_completion_wizard(
|
|
client,
|
|
doc,
|
|
sheet,
|
|
insert_row,
|
|
project_code=project_code,
|
|
schema=_settings.get("default_schema", "kindred-rd"),
|
|
)
|
|
if ok:
|
|
_log(f"Added new item at row {insert_row + 1}")
|
|
|
|
|
|
def _cmd_refresh(frame):
|
|
"""Re-pull the current sheet from server."""
|
|
_msgbox("Refresh", "Refresh -- coming soon.")
|
|
|
|
|
|
def _cmd_ai_description(frame):
|
|
"""Generate an AI description from the seller description in the current row."""
|
|
from silo_calc import ai_client as _ai
|
|
from silo_calc import dialogs
|
|
from silo_calc import pull as _pull
|
|
from silo_calc import sheet_format as sf
|
|
|
|
if not _ai.is_configured():
|
|
_msgbox(
|
|
"AI Describe",
|
|
"OpenRouter API key not configured.\n\n"
|
|
"Set it in Silo Settings or via the OPENROUTER_API_KEY environment variable.",
|
|
box_type="errorbox",
|
|
)
|
|
return
|
|
|
|
doc, sheet = _get_active_sheet()
|
|
if doc is None or sheet is None:
|
|
_msgbox("AI Describe", "No active spreadsheet.", box_type="errorbox")
|
|
return
|
|
|
|
controller = doc.getCurrentController()
|
|
selection = controller.getSelection()
|
|
try:
|
|
cell_addr = selection.getCellAddress()
|
|
row = cell_addr.Row
|
|
except AttributeError:
|
|
try:
|
|
range_addr = selection.getRangeAddress()
|
|
row = range_addr.StartRow
|
|
except AttributeError:
|
|
_msgbox("AI Describe", "Select a cell in a BOM row.", box_type="errorbox")
|
|
return
|
|
|
|
if row == 0:
|
|
_msgbox(
|
|
"AI Describe", "Select a data row, not the header.", box_type="errorbox"
|
|
)
|
|
return
|
|
|
|
seller_desc = sheet.getCellByPosition(sf.COL_SELLER_DESC, row).getString().strip()
|
|
if not seller_desc:
|
|
_msgbox(
|
|
"AI Describe",
|
|
f"No seller description in column F (row {row + 1}).",
|
|
box_type="errorbox",
|
|
)
|
|
return
|
|
|
|
existing_desc = sheet.getCellByPosition(sf.COL_DESCRIPTION, row).getString().strip()
|
|
part_number = sheet.getCellByPosition(sf.COL_PN, row).getString().strip()
|
|
category = part_number[:3] if len(part_number) >= 3 else ""
|
|
|
|
while True:
|
|
try:
|
|
ai_desc = _ai.generate_description(
|
|
seller_description=seller_desc,
|
|
category=category,
|
|
existing_description=existing_desc,
|
|
part_number=part_number,
|
|
)
|
|
except RuntimeError as e:
|
|
_msgbox("AI Describe Failed", str(e), box_type="errorbox")
|
|
return
|
|
|
|
accepted = dialogs.show_ai_description_dialog(seller_desc, ai_desc)
|
|
if accepted is not None:
|
|
_pull._set_cell_string(sheet, sf.COL_DESCRIPTION, row, accepted)
|
|
_log(f"AI description written to row {row + 1}: {accepted}")
|
|
return
|
|
|
|
retry = dialogs._input_box(
|
|
"AI Describe",
|
|
"Generate again? (yes/no):",
|
|
default="no",
|
|
)
|
|
if not retry or retry.strip().lower() not in ("yes", "y"):
|
|
return
|
|
|
|
|
|
# Command dispatch table
|
|
_COMMANDS = {
|
|
"SiloLogin": _cmd_login,
|
|
"SiloPullBOM": _cmd_pull_bom,
|
|
"SiloPullProject": _cmd_pull_project,
|
|
"SiloPush": _cmd_push,
|
|
"SiloAddItem": _cmd_add_item,
|
|
"SiloRefresh": _cmd_refresh,
|
|
"SiloSettings": _cmd_settings,
|
|
"SiloAIDescription": _cmd_ai_description,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# UNO Dispatch implementation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class SiloDispatch(unohelper.Base, XDispatch):
|
|
"""Handles a single dispatched command."""
|
|
|
|
def __init__(self, command: str, frame):
|
|
self._command = command
|
|
self._frame = frame
|
|
self._listeners = []
|
|
|
|
def dispatch(self, url, args):
|
|
handler = _COMMANDS.get(self._command)
|
|
if handler:
|
|
try:
|
|
handler(self._frame)
|
|
except Exception:
|
|
_log(f"Error in {self._command}:\n{traceback.format_exc()}")
|
|
try:
|
|
_msgbox(
|
|
f"Silo Error: {self._command}",
|
|
traceback.format_exc(),
|
|
box_type="errorbox",
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
def addStatusListener(self, listener, url):
|
|
self._listeners.append(listener)
|
|
|
|
def removeStatusListener(self, listener, url):
|
|
if listener in self._listeners:
|
|
self._listeners.remove(listener)
|
|
|
|
|
|
class SiloDispatchProvider(
|
|
unohelper.Base, XDispatchProvider, XInitialization, XServiceInfo
|
|
):
|
|
"""ProtocolHandler component for Silo commands.
|
|
|
|
LibreOffice instantiates this via com.sun.star.frame.ProtocolHandler
|
|
and calls initialize() with the frame, then queryDispatch() for each
|
|
command URL matching our protocol.
|
|
"""
|
|
|
|
def __init__(self, ctx):
|
|
self._ctx = ctx
|
|
self._frame = None
|
|
|
|
# XInitialization -- called by framework with the Frame
|
|
def initialize(self, args):
|
|
if args:
|
|
self._frame = args[0]
|
|
|
|
# XDispatchProvider
|
|
def queryDispatch(self, url, target_frame_name, search_flags):
|
|
if url.Protocol == _PROTOCOL:
|
|
command = url.Path
|
|
if command in _COMMANDS:
|
|
return SiloDispatch(command, self._frame)
|
|
return None
|
|
|
|
def queryDispatches(self, requests):
|
|
return [
|
|
self.queryDispatch(r.FeatureURL, r.FrameName, r.SearchFlags)
|
|
for r in requests
|
|
]
|
|
|
|
# XServiceInfo
|
|
def getImplementationName(self):
|
|
return _IMPL_NAME
|
|
|
|
def supportsService(self, name):
|
|
return name == _SERVICE_NAME
|
|
|
|
def getSupportedServiceNames(self):
|
|
return (_SERVICE_NAME,)
|
|
|
|
|
|
# UNO component registration
|
|
g_ImplementationHelper = unohelper.ImplementationHelper()
|
|
g_ImplementationHelper.addImplementation(
|
|
SiloDispatchProvider,
|
|
_IMPL_NAME,
|
|
(_SERVICE_NAME,),
|
|
)
|