LibreOffice Calc extension for Silo PLM integration. Uses shared silo-client package (submodule) for API communication. Changes from monorepo version: - SiloClient class removed from client.py, replaced with CalcSiloSettings adapter + factory function wrapping silo_client.SiloClient - silo_calc_component.py adds silo-client to sys.path - Makefile build-oxt copies silo_client into .oxt for self-contained packaging - All other modules unchanged
161 lines
4.9 KiB
Python
161 lines
4.9 KiB
Python
"""Row hashing, diff classification, and sync state tracking.
|
|
|
|
Used by push/pull commands to detect which rows have been modified locally
|
|
since the last pull, and to detect conflicts with server-side changes.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from . import sheet_format as sf
|
|
|
|
# Row statuses
|
|
STATUS_SYNCED = "synced"
|
|
STATUS_MODIFIED = "modified"
|
|
STATUS_NEW = "new"
|
|
STATUS_ERROR = "error"
|
|
STATUS_CONFLICT = "conflict"
|
|
|
|
|
|
def compute_row_hash(cells: List[str]) -> str:
|
|
"""SHA-256 hash of the visible + property columns of a row.
|
|
|
|
Only the data columns are hashed (not sync tracking columns).
|
|
Blank/empty cells are normalised to the empty string.
|
|
"""
|
|
# Use columns 0..COL_PROP_END-1 (visible + properties, not sync cols)
|
|
data_cells = cells[: sf.COL_PROP_END]
|
|
# Normalise
|
|
normalised = [str(c).strip() if c else "" for c in data_cells]
|
|
raw = "\t".join(normalised).encode("utf-8")
|
|
return hashlib.sha256(raw).hexdigest()
|
|
|
|
|
|
def classify_row(cells: List[str]) -> str:
|
|
"""Return the sync status of a single row.
|
|
|
|
Reads the stored hash and current cell values to determine whether
|
|
the row is synced, modified, new, or in an error state.
|
|
"""
|
|
# Ensure we have enough columns
|
|
while len(cells) < sf.BOM_TOTAL_COLS:
|
|
cells.append("")
|
|
|
|
stored_hash = cells[sf.COL_ROW_HASH].strip() if cells[sf.COL_ROW_HASH] else ""
|
|
stored_status = cells[sf.COL_ROW_STATUS].strip() if cells[sf.COL_ROW_STATUS] else ""
|
|
|
|
# No hash -> new row (never pulled from server)
|
|
if not stored_hash:
|
|
# Check if there's any data in the row
|
|
has_data = any(
|
|
cells[i].strip()
|
|
for i in range(sf.COL_PROP_END)
|
|
if i < len(cells) and cells[i]
|
|
)
|
|
return STATUS_NEW if has_data else ""
|
|
|
|
# Compute current hash and compare
|
|
current_hash = compute_row_hash(cells)
|
|
if current_hash == stored_hash:
|
|
return STATUS_SYNCED
|
|
return STATUS_MODIFIED
|
|
|
|
|
|
def classify_rows(all_rows: List[List[str]]) -> List[Tuple[int, str, List[str]]]:
|
|
"""Classify every row in a sheet.
|
|
|
|
Returns list of ``(row_index, status, cells)`` for rows that have data.
|
|
Blank separator rows and the header row (index 0) are skipped.
|
|
"""
|
|
results = []
|
|
for i, cells in enumerate(all_rows):
|
|
if i == 0:
|
|
continue # header row
|
|
status = classify_row(list(cells))
|
|
if status:
|
|
results.append((i, status, list(cells)))
|
|
return results
|
|
|
|
|
|
def build_push_diff(
|
|
classified: List[Tuple[int, str, List[str]]],
|
|
server_timestamps: Optional[Dict[str, str]] = None,
|
|
) -> Dict[str, List[Dict[str, Any]]]:
|
|
"""Build a push diff from classified rows.
|
|
|
|
*server_timestamps* maps part numbers to their server ``updated_at``
|
|
values, used for conflict detection.
|
|
|
|
Returns a dict with keys ``new``, ``modified``, ``conflicts``, and
|
|
the count of ``unchanged`` rows.
|
|
"""
|
|
server_ts = server_timestamps or {}
|
|
new_rows = []
|
|
modified_rows = []
|
|
conflicts = []
|
|
unchanged = 0
|
|
|
|
for row_idx, status, cells in classified:
|
|
if status == STATUS_SYNCED:
|
|
unchanged += 1
|
|
continue
|
|
|
|
pn = cells[sf.COL_PN].strip() if len(cells) > sf.COL_PN else ""
|
|
stored_ts = (
|
|
cells[sf.COL_UPDATED_AT].strip()
|
|
if len(cells) > sf.COL_UPDATED_AT and cells[sf.COL_UPDATED_AT]
|
|
else ""
|
|
)
|
|
|
|
row_info = {
|
|
"row_index": row_idx,
|
|
"part_number": pn,
|
|
"description": cells[sf.COL_DESCRIPTION].strip()
|
|
if len(cells) > sf.COL_DESCRIPTION
|
|
else "",
|
|
"cells": cells[: sf.COL_PROP_END],
|
|
}
|
|
|
|
if status == STATUS_NEW:
|
|
new_rows.append(row_info)
|
|
elif status == STATUS_MODIFIED:
|
|
# Check for conflict: server changed since we pulled
|
|
server_updated = server_ts.get(pn, "")
|
|
if stored_ts and server_updated and server_updated != stored_ts:
|
|
row_info["local_ts"] = stored_ts
|
|
row_info["server_ts"] = server_updated
|
|
conflicts.append(row_info)
|
|
else:
|
|
modified_rows.append(row_info)
|
|
|
|
return {
|
|
"new": new_rows,
|
|
"modified": modified_rows,
|
|
"conflicts": conflicts,
|
|
"unchanged": unchanged,
|
|
}
|
|
|
|
|
|
def update_row_sync_state(
|
|
cells: List[str],
|
|
status: str,
|
|
updated_at: str = "",
|
|
parent_pn: str = "",
|
|
) -> List[str]:
|
|
"""Set the sync tracking columns on a row and return it.
|
|
|
|
Recomputes the row hash from current visible+property data.
|
|
"""
|
|
while len(cells) < sf.BOM_TOTAL_COLS:
|
|
cells.append("")
|
|
|
|
cells[sf.COL_ROW_HASH] = compute_row_hash(cells)
|
|
cells[sf.COL_ROW_STATUS] = status
|
|
if updated_at:
|
|
cells[sf.COL_UPDATED_AT] = updated_at
|
|
if parent_pn:
|
|
cells[sf.COL_PARENT_PN] = parent_pn
|
|
|
|
return cells
|