LibreOffice Calc extension for Silo PLM integration. Uses shared silo-client package (submodule) for API communication. Changes from monorepo version: - SiloClient class removed from client.py, replaced with CalcSiloSettings adapter + factory function wrapping silo_client.SiloClient - silo_calc_component.py adds silo-client to sys.path - Makefile build-oxt copies silo_client into .oxt for self-contained packaging - All other modules unchanged
432 lines
14 KiB
Python
432 lines
14 KiB
Python
"""Push command -- sync local BOM edits back to the Silo database.
|
|
|
|
Handles:
|
|
- Row classification (new / modified / synced / conflict)
|
|
- Creating new items via the API
|
|
- Updating existing items and BOM entry metadata
|
|
- Auto-tagging new items with the project code
|
|
- Conflict detection against server timestamps
|
|
- Updating row sync state after successful push
|
|
"""
|
|
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from . import sheet_format as sf
|
|
from . import sync_engine
|
|
from .client import SiloClient
|
|
|
|
# UNO imports
|
|
try:
|
|
import uno
|
|
|
|
_HAS_UNO = True
|
|
except ImportError:
|
|
_HAS_UNO = False
|
|
|
|
|
|
def _read_sheet_rows(sheet) -> List[List[str]]:
|
|
"""Read all rows from a sheet as lists of strings."""
|
|
cursor = sheet.createCursor()
|
|
cursor.gotoStartOfUsedArea(False)
|
|
cursor.gotoEndOfUsedArea(True)
|
|
addr = cursor.getRangeAddress()
|
|
end_row = addr.EndRow
|
|
end_col = max(addr.EndColumn, sf.BOM_TOTAL_COLS - 1)
|
|
|
|
rows = []
|
|
for r in range(end_row + 1):
|
|
row_cells = []
|
|
for c in range(end_col + 1):
|
|
cell = sheet.getCellByPosition(c, r)
|
|
# Get display string for all cell types
|
|
val = cell.getString()
|
|
row_cells.append(val)
|
|
# Pad to full width
|
|
while len(row_cells) < sf.BOM_TOTAL_COLS:
|
|
row_cells.append("")
|
|
rows.append(row_cells)
|
|
return rows
|
|
|
|
|
|
def _detect_project_code(doc) -> str:
|
|
"""Try to detect the project code from the file path."""
|
|
try:
|
|
file_url = doc.getURL()
|
|
if file_url:
|
|
file_path = uno.fileUrlToSystemPath(file_url)
|
|
parts = file_path.replace("\\", "/").split("/")
|
|
if "sheets" in parts:
|
|
idx = parts.index("sheets")
|
|
if idx + 1 < len(parts):
|
|
return parts[idx + 1]
|
|
except Exception:
|
|
pass
|
|
return ""
|
|
|
|
|
|
def _fetch_server_timestamps(
|
|
client: SiloClient, part_numbers: List[str]
|
|
) -> Dict[str, str]:
|
|
"""Fetch updated_at timestamps for a list of part numbers."""
|
|
timestamps = {}
|
|
for pn in part_numbers:
|
|
if not pn:
|
|
continue
|
|
try:
|
|
item = client.get_item(pn)
|
|
timestamps[pn] = item.get("updated_at", "")
|
|
except RuntimeError:
|
|
pass
|
|
return timestamps
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Push execution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def push_sheet(
|
|
client: SiloClient,
|
|
doc,
|
|
sheet,
|
|
schema: str = "kindred-rd",
|
|
) -> Dict[str, Any]:
|
|
"""Execute a push for the active BOM sheet.
|
|
|
|
Returns a summary dict with counts and any errors.
|
|
"""
|
|
if not _HAS_UNO:
|
|
raise RuntimeError("UNO API not available")
|
|
|
|
rows = _read_sheet_rows(sheet)
|
|
if not rows:
|
|
return {"created": 0, "updated": 0, "errors": [], "skipped": 0}
|
|
|
|
project_code = _detect_project_code(doc)
|
|
|
|
# Classify all rows
|
|
classified = sync_engine.classify_rows(rows)
|
|
|
|
# Collect part numbers for server timestamp check
|
|
modified_pns = [
|
|
cells[sf.COL_PN].strip()
|
|
for _, status, cells in classified
|
|
if status == sync_engine.STATUS_MODIFIED and cells[sf.COL_PN].strip()
|
|
]
|
|
server_ts = _fetch_server_timestamps(client, modified_pns)
|
|
|
|
# Build diff
|
|
diff = sync_engine.build_push_diff(classified, server_timestamps=server_ts)
|
|
|
|
results = {
|
|
"created": 0,
|
|
"updated": 0,
|
|
"errors": [],
|
|
"skipped": diff["unchanged"],
|
|
"conflicts": len(diff["conflicts"]),
|
|
}
|
|
|
|
# -- Handle new rows: create items in the database ----------------------
|
|
for row_info in diff["new"]:
|
|
row_idx = row_info["row_index"]
|
|
cells = rows[row_idx]
|
|
pn = cells[sf.COL_PN].strip()
|
|
desc = cells[sf.COL_DESCRIPTION].strip()
|
|
source = cells[sf.COL_SOURCE].strip()
|
|
sourcing_link = cells[sf.COL_SOURCING_LINK].strip()
|
|
unit_cost_str = cells[sf.COL_UNIT_COST].strip()
|
|
qty_str = cells[sf.COL_QTY].strip()
|
|
parent_pn = (
|
|
cells[sf.COL_PARENT_PN].strip() if len(cells) > sf.COL_PARENT_PN else ""
|
|
)
|
|
|
|
unit_cost = None
|
|
if unit_cost_str:
|
|
try:
|
|
unit_cost = float(unit_cost_str.replace("$", "").replace(",", ""))
|
|
except ValueError:
|
|
pass
|
|
|
|
qty = 1.0
|
|
if qty_str:
|
|
try:
|
|
qty = float(qty_str)
|
|
except ValueError:
|
|
pass
|
|
|
|
if not desc:
|
|
results["errors"].append(
|
|
f"Row {row_idx + 1}: description is required for new items"
|
|
)
|
|
_set_row_status(sheet, row_idx, sync_engine.STATUS_ERROR)
|
|
continue
|
|
|
|
try:
|
|
if pn:
|
|
# Check if item already exists
|
|
try:
|
|
existing = client.get_item(pn)
|
|
# Item exists -- just update BOM relationship if parent is known
|
|
if parent_pn:
|
|
_update_bom_relationship(
|
|
client, parent_pn, pn, qty, unit_cost, cells
|
|
)
|
|
results["updated"] += 1
|
|
_update_row_after_push(sheet, rows, row_idx, existing)
|
|
continue
|
|
except RuntimeError:
|
|
pass # Item doesn't exist, create it
|
|
|
|
# Detect category from PN prefix (e.g., F01-0001 -> F01)
|
|
category = pn[:3] if pn and len(pn) >= 3 else ""
|
|
|
|
# Create the item
|
|
create_data = {
|
|
"schema": schema,
|
|
"category": category,
|
|
"description": desc,
|
|
}
|
|
if source:
|
|
create_data["sourcing_type"] = source
|
|
if sourcing_link:
|
|
create_data["sourcing_link"] = sourcing_link
|
|
if unit_cost is not None:
|
|
create_data["standard_cost"] = unit_cost
|
|
if project_code:
|
|
create_data["projects"] = [project_code]
|
|
|
|
created = client.create_item(**create_data)
|
|
created_pn = created.get("part_number", pn)
|
|
|
|
# Update the PN cell if it was auto-generated
|
|
if not pn and created_pn:
|
|
from . import pull as _pull
|
|
|
|
_pull._set_cell_string(sheet, sf.COL_PN, row_idx, created_pn)
|
|
cells[sf.COL_PN] = created_pn
|
|
|
|
# Add to parent's BOM if parent is known
|
|
if parent_pn:
|
|
_update_bom_relationship(
|
|
client, parent_pn, created_pn, qty, unit_cost, cells
|
|
)
|
|
|
|
# Auto-tag with project
|
|
if project_code:
|
|
try:
|
|
client.add_item_projects(created_pn, [project_code])
|
|
except RuntimeError:
|
|
pass
|
|
|
|
# Set property columns via revision update (if any properties set)
|
|
_push_properties(client, created_pn, cells)
|
|
|
|
results["created"] += 1
|
|
_update_row_after_push(sheet, rows, row_idx, created)
|
|
|
|
except RuntimeError as e:
|
|
results["errors"].append(f"Row {row_idx + 1} ({pn}): {e}")
|
|
_set_row_status(sheet, row_idx, sync_engine.STATUS_ERROR)
|
|
|
|
# -- Handle modified rows: update items ---------------------------------
|
|
for row_info in diff["modified"]:
|
|
row_idx = row_info["row_index"]
|
|
cells = rows[row_idx]
|
|
pn = cells[sf.COL_PN].strip()
|
|
parent_pn = (
|
|
cells[sf.COL_PARENT_PN].strip() if len(cells) > sf.COL_PARENT_PN else ""
|
|
)
|
|
|
|
if not pn:
|
|
results["errors"].append(
|
|
f"Row {row_idx + 1}: no part number for modified row"
|
|
)
|
|
continue
|
|
|
|
try:
|
|
# Update item fields
|
|
update_fields = {}
|
|
desc = cells[sf.COL_DESCRIPTION].strip()
|
|
if desc:
|
|
update_fields["description"] = desc
|
|
source = cells[sf.COL_SOURCE].strip()
|
|
if source:
|
|
update_fields["sourcing_type"] = source
|
|
sourcing_link = cells[sf.COL_SOURCING_LINK].strip()
|
|
update_fields["sourcing_link"] = sourcing_link
|
|
|
|
unit_cost_str = cells[sf.COL_UNIT_COST].strip()
|
|
unit_cost = None
|
|
if unit_cost_str:
|
|
try:
|
|
unit_cost = float(unit_cost_str.replace("$", "").replace(",", ""))
|
|
update_fields["standard_cost"] = unit_cost
|
|
except ValueError:
|
|
pass
|
|
|
|
if update_fields:
|
|
updated = client.update_item(pn, **update_fields)
|
|
else:
|
|
updated = client.get_item(pn)
|
|
|
|
# Update BOM relationship
|
|
qty_str = cells[sf.COL_QTY].strip()
|
|
qty = 1.0
|
|
if qty_str:
|
|
try:
|
|
qty = float(qty_str)
|
|
except ValueError:
|
|
pass
|
|
|
|
if parent_pn:
|
|
_update_bom_relationship(client, parent_pn, pn, qty, unit_cost, cells)
|
|
|
|
# Update properties
|
|
_push_properties(client, pn, cells)
|
|
|
|
# Auto-tag with project
|
|
if project_code:
|
|
try:
|
|
existing_projects = client.get_item_projects(pn)
|
|
existing_codes = (
|
|
{p.get("code", "") for p in existing_projects}
|
|
if isinstance(existing_projects, list)
|
|
else set()
|
|
)
|
|
if project_code not in existing_codes:
|
|
client.add_item_projects(pn, [project_code])
|
|
except RuntimeError:
|
|
pass
|
|
|
|
results["updated"] += 1
|
|
_update_row_after_push(sheet, rows, row_idx, updated)
|
|
|
|
except RuntimeError as e:
|
|
results["errors"].append(f"Row {row_idx + 1} ({pn}): {e}")
|
|
_set_row_status(sheet, row_idx, sync_engine.STATUS_ERROR)
|
|
|
|
# -- Mark conflicts -----------------------------------------------------
|
|
for row_info in diff["conflicts"]:
|
|
row_idx = row_info["row_index"]
|
|
_set_row_status(sheet, row_idx, sync_engine.STATUS_CONFLICT)
|
|
|
|
return results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _update_bom_relationship(
|
|
client: SiloClient,
|
|
parent_pn: str,
|
|
child_pn: str,
|
|
qty: float,
|
|
unit_cost: Optional[float],
|
|
cells: List[str],
|
|
):
|
|
"""Create or update a BOM relationship between parent and child."""
|
|
metadata = {}
|
|
seller_desc = (
|
|
cells[sf.COL_SELLER_DESC].strip() if len(cells) > sf.COL_SELLER_DESC else ""
|
|
)
|
|
if seller_desc:
|
|
metadata["seller_description"] = seller_desc
|
|
if unit_cost is not None:
|
|
metadata["unit_cost"] = unit_cost
|
|
sourcing_link = (
|
|
cells[sf.COL_SOURCING_LINK].strip() if len(cells) > sf.COL_SOURCING_LINK else ""
|
|
)
|
|
if sourcing_link:
|
|
metadata["sourcing_link"] = sourcing_link
|
|
|
|
try:
|
|
# Try update first (entry may already exist)
|
|
client.update_bom_entry(
|
|
parent_pn,
|
|
child_pn,
|
|
quantity=qty,
|
|
metadata=metadata if metadata else None,
|
|
)
|
|
except RuntimeError:
|
|
# If update fails, try creating
|
|
try:
|
|
client.add_bom_entry(
|
|
parent_pn,
|
|
child_pn,
|
|
quantity=qty,
|
|
metadata=metadata if metadata else None,
|
|
)
|
|
except RuntimeError:
|
|
pass # Best effort
|
|
|
|
|
|
def _push_properties(client: SiloClient, pn: str, cells: List[str]):
|
|
"""Push property column values to the item's latest revision.
|
|
|
|
Currently this is best-effort -- the API may not support bulk property
|
|
updates in a single call. Properties are stored in revision.properties
|
|
JSONB on the server side.
|
|
"""
|
|
# Collect property values from the row
|
|
properties = {}
|
|
for i, header in enumerate(sf.BOM_PROPERTY_HEADERS):
|
|
col_idx = sf.COL_PROP_START + i
|
|
if col_idx < len(cells):
|
|
val = cells[col_idx].strip()
|
|
if val:
|
|
db_key = sf.PROPERTY_KEY_MAP.get(header, "")
|
|
if db_key:
|
|
properties[db_key] = val
|
|
|
|
if not properties:
|
|
return
|
|
|
|
# The Silo API stores properties on revisions. For now, we'll update
|
|
# the item's long_description if it's set, and rely on the revision
|
|
# properties being set during create or via revision update.
|
|
long_desc = properties.pop("long_description", None)
|
|
if long_desc:
|
|
try:
|
|
client.update_item(pn, long_description=long_desc)
|
|
except RuntimeError:
|
|
pass
|
|
|
|
|
|
def _update_row_after_push(
|
|
sheet, rows: List[List[str]], row_idx: int, item: Dict[str, Any]
|
|
):
|
|
"""Update sync tracking columns after a successful push."""
|
|
from . import pull as _pull
|
|
|
|
cells = rows[row_idx]
|
|
|
|
# Update the PN if the server returned one (auto-generated)
|
|
server_pn = item.get("part_number", "")
|
|
if server_pn and not cells[sf.COL_PN].strip():
|
|
cells[sf.COL_PN] = server_pn
|
|
_pull._set_cell_string(sheet, sf.COL_PN, row_idx, server_pn)
|
|
|
|
# Recompute hash and set synced status
|
|
sync_engine.update_row_sync_state(
|
|
cells,
|
|
sync_engine.STATUS_SYNCED,
|
|
updated_at=item.get("updated_at", ""),
|
|
)
|
|
_pull._set_cell_string(sheet, sf.COL_ROW_HASH, row_idx, cells[sf.COL_ROW_HASH])
|
|
_pull._set_cell_string(sheet, sf.COL_ROW_STATUS, row_idx, cells[sf.COL_ROW_STATUS])
|
|
_pull._set_cell_string(sheet, sf.COL_UPDATED_AT, row_idx, cells[sf.COL_UPDATED_AT])
|
|
|
|
_pull._set_row_bg(sheet, row_idx, sf.BOM_TOTAL_COLS, _pull._STATUS_COLORS["synced"])
|
|
|
|
|
|
def _set_row_status(sheet, row_idx: int, status: str):
|
|
"""Set just the status cell and row colour."""
|
|
from . import pull as _pull
|
|
|
|
_pull._set_cell_string(sheet, sf.COL_ROW_STATUS, row_idx, status)
|
|
color = _pull._STATUS_COLORS.get(status)
|
|
if color:
|
|
_pull._set_row_bg(sheet, row_idx, sf.BOM_TOTAL_COLS, color)
|