"""Pull commands -- populate LibreOffice Calc sheets from Silo API data. This module handles the UNO cell-level work for SiloPullBOM and SiloPullProject. It fetches data via the SiloClient, then writes cells with proper formatting, formulas, hidden columns, and row hash tracking. """ from typing import Any, Dict, List, Optional from . import sheet_format as sf from . import sync_engine from .client import SiloClient # UNO imports -- only available inside LibreOffice try: import uno from com.sun.star.beans import PropertyValue from com.sun.star.table import CellHoriJustify _HAS_UNO = True except ImportError: _HAS_UNO = False # --------------------------------------------------------------------------- # Colour helpers (UNO uses 0xRRGGBB integers) # --------------------------------------------------------------------------- def _rgb_int(r: int, g: int, b: int) -> int: return (r << 16) | (g << 8) | b _HEADER_BG = _rgb_int(68, 114, 196) # steel blue _HEADER_FG = _rgb_int(255, 255, 255) # white text _STATUS_COLORS = {k: _rgb_int(*v) for k, v in sf.STATUS_COLORS.items()} # --------------------------------------------------------------------------- # Cell writing helpers # --------------------------------------------------------------------------- def _set_cell_string(sheet, col: int, row: int, value: str): cell = sheet.getCellByPosition(col, row) cell.setString(str(value) if value else "") def _set_cell_float(sheet, col: int, row: int, value, fmt: str = ""): cell = sheet.getCellByPosition(col, row) try: cell.setValue(float(value)) except (ValueError, TypeError): cell.setString(str(value) if value else "") def _set_cell_formula(sheet, col: int, row: int, formula: str): cell = sheet.getCellByPosition(col, row) cell.setFormula(formula) def _set_row_bg(sheet, row: int, col_count: int, color: int): """Set background colour on an entire row.""" rng = sheet.getCellRangeByPosition(0, row, col_count - 1, row) rng.CellBackColor = color def _format_header_row(sheet, col_count: int): """Bold white text on blue background for row 0.""" rng = sheet.getCellRangeByPosition(0, 0, col_count - 1, 0) rng.CellBackColor = _HEADER_BG rng.CharColor = _HEADER_FG rng.CharWeight = 150 # com.sun.star.awt.FontWeight.BOLD def _freeze_row(doc, row: int = 1): """Freeze panes at the given row (default: freeze header).""" ctrl = doc.getCurrentController() ctrl.freezeAtPosition(0, row) def _hide_columns(sheet, start_col: int, end_col: int): """Hide a range of columns (inclusive).""" cols = sheet.getColumns() for i in range(start_col, end_col): col = cols.getByIndex(i) col.IsVisible = False def _set_column_width(sheet, col: int, width_mm100: int): """Set column width in 1/100 mm.""" cols = sheet.getColumns() c = cols.getByIndex(col) c.Width = width_mm100 # --------------------------------------------------------------------------- # BOM data helpers # --------------------------------------------------------------------------- def _get_meta(entry: Dict, key: str, default: str = "") -> str: """Extract a value from a BOM entry's metadata dict.""" meta = entry.get("metadata") or {} val = meta.get(key, default) return str(val) if val else default def _get_meta_float(entry: Dict, key: str) -> Optional[float]: meta = entry.get("metadata") or {} val = meta.get(key) if val is not None: try: return float(val) except (ValueError, TypeError): pass return None def _get_property(rev: Optional[Dict], key: str) -> str: """Extract a property from a revision's properties dict.""" if not rev: return "" props = rev.get("properties") or {} val = props.get(key, "") return str(val) if val else "" # --------------------------------------------------------------------------- # SiloPullBOM # --------------------------------------------------------------------------- def pull_bom( client: SiloClient, doc, sheet, assembly_pn: str, project_code: str = "", schema: str = "kindred-rd", ): """Fetch an expanded BOM and populate *sheet* with formatted data. Parameters ---------- client : SiloClient doc : XSpreadsheetDocument sheet : XSpreadsheet (the target sheet to populate) assembly_pn : str (top-level assembly part number) project_code : str (project code for auto-tagging, optional) schema : str """ if not _HAS_UNO: raise RuntimeError("UNO API not available -- must run inside LibreOffice") # Fetch expanded BOM bom_entries = client.get_bom_expanded(assembly_pn, depth=10) if not bom_entries: raise RuntimeError(f"No BOM entries found for {assembly_pn}") # Fetch the top-level item for the assembly name try: top_item = client.get_item(assembly_pn) except RuntimeError: top_item = {} # Build a cache of items and their latest revisions for property lookup item_cache: Dict[str, Dict] = {} rev_cache: Dict[str, Dict] = {} def _ensure_cached(pn: str): if pn in item_cache: return try: item_cache[pn] = client.get_item(pn) except RuntimeError: item_cache[pn] = {} try: revisions = client.get_revisions(pn) if revisions: rev_cache[pn] = revisions[0] # newest first except RuntimeError: pass # Pre-cache all items in the BOM all_pns = set() for e in bom_entries: all_pns.add(e.get("child_part_number", "")) all_pns.add(e.get("parent_part_number", "")) all_pns.discard("") for pn in all_pns: _ensure_cached(pn) # -- Write header row --------------------------------------------------- for col_idx, header in enumerate(sf.BOM_ALL_HEADERS): _set_cell_string(sheet, col_idx, 0, header) _format_header_row(sheet, sf.BOM_TOTAL_COLS) # -- Group entries by parent for section headers ------------------------ # BOM entries come back in tree order (parent then children). # We insert section header rows for each depth-1 sub-assembly. row = 1 # current write row (0 is header) prev_parent = None for entry in bom_entries: depth = entry.get("depth", 0) child_pn = entry.get("child_part_number", "") parent_pn = entry.get("parent_part_number", "") child_item = item_cache.get(child_pn, {}) child_rev = rev_cache.get(child_pn) # Section header: when the parent changes for depth >= 1 entries if depth == 1 and parent_pn != prev_parent and parent_pn: if row > 1: # Blank separator row row += 1 # Sub-assembly label row parent_item = item_cache.get(parent_pn, {}) label = parent_item.get("description", parent_pn) _set_cell_string(sheet, sf.COL_ITEM, row, label) _set_cell_float(sheet, sf.COL_LEVEL, row, 0) _set_cell_string(sheet, sf.COL_SOURCE, row, "M") _set_cell_string(sheet, sf.COL_PN, row, parent_pn) # Compute sub-assembly cost from children if available parent_cost = _compute_subassembly_cost(bom_entries, parent_pn, item_cache) if parent_cost is not None: _set_cell_float(sheet, sf.COL_UNIT_COST, row, parent_cost) _set_cell_float(sheet, sf.COL_QTY, row, 1) # Ext Cost formula ext_formula = f"={sf.col_letter(sf.COL_UNIT_COST)}{row + 1}*{sf.col_letter(sf.COL_QTY)}{row + 1}" _set_cell_formula(sheet, sf.COL_EXT_COST, row, ext_formula) _set_cell_string(sheet, sf.COL_SCHEMA, row, schema) # Sync tracking for parent row parent_cells = [""] * sf.BOM_TOTAL_COLS parent_cells[sf.COL_ITEM] = label parent_cells[sf.COL_LEVEL] = "0" parent_cells[sf.COL_SOURCE] = "M" parent_cells[sf.COL_PN] = parent_pn parent_cells[sf.COL_SCHEMA] = schema sync_engine.update_row_sync_state( parent_cells, sync_engine.STATUS_SYNCED, updated_at=parent_item.get("updated_at", ""), parent_pn="", ) _set_cell_string(sheet, sf.COL_ROW_HASH, row, parent_cells[sf.COL_ROW_HASH]) _set_cell_string( sheet, sf.COL_ROW_STATUS, row, parent_cells[sf.COL_ROW_STATUS] ) _set_cell_string( sheet, sf.COL_UPDATED_AT, row, parent_cells[sf.COL_UPDATED_AT] ) _set_row_bg(sheet, row, sf.BOM_TOTAL_COLS, _STATUS_COLORS["synced"]) prev_parent = parent_pn row += 1 # -- Write child row ----------------------------------------------- quantity = entry.get("quantity") unit_cost = _get_meta_float(entry, "unit_cost") if unit_cost is None: unit_cost = child_item.get("standard_cost") # Item column: blank for children (name is in the section header) _set_cell_string(sheet, sf.COL_ITEM, row, "") _set_cell_float(sheet, sf.COL_LEVEL, row, depth) _set_cell_string(sheet, sf.COL_SOURCE, row, child_item.get("sourcing_type", "")) _set_cell_string(sheet, sf.COL_PN, row, child_pn) _set_cell_string( sheet, sf.COL_DESCRIPTION, row, child_item.get("description", "") ) _set_cell_string( sheet, sf.COL_SELLER_DESC, row, _get_meta(entry, "seller_description") ) if unit_cost is not None: _set_cell_float(sheet, sf.COL_UNIT_COST, row, unit_cost) if quantity is not None: _set_cell_float(sheet, sf.COL_QTY, row, quantity) # Ext Cost formula ext_formula = f"={sf.col_letter(sf.COL_UNIT_COST)}{row + 1}*{sf.col_letter(sf.COL_QTY)}{row + 1}" _set_cell_formula(sheet, sf.COL_EXT_COST, row, ext_formula) _set_cell_string( sheet, sf.COL_SOURCING_LINK, row, child_item.get("sourcing_link", "") ) _set_cell_string(sheet, sf.COL_SCHEMA, row, schema) # -- Property columns ----------------------------------------------- prop_values = _build_property_cells(child_item, child_rev, entry) for i, val in enumerate(prop_values): if val: _set_cell_string(sheet, sf.COL_PROP_START + i, row, val) # -- Sync tracking --------------------------------------------------- row_cells = [""] * sf.BOM_TOTAL_COLS row_cells[sf.COL_LEVEL] = str(depth) row_cells[sf.COL_SOURCE] = child_item.get("sourcing_type", "") row_cells[sf.COL_PN] = child_pn row_cells[sf.COL_DESCRIPTION] = child_item.get("description", "") row_cells[sf.COL_SELLER_DESC] = _get_meta(entry, "seller_description") row_cells[sf.COL_UNIT_COST] = str(unit_cost) if unit_cost else "" row_cells[sf.COL_QTY] = str(quantity) if quantity else "" row_cells[sf.COL_SOURCING_LINK] = child_item.get("sourcing_link", "") row_cells[sf.COL_SCHEMA] = schema for i, val in enumerate(prop_values): row_cells[sf.COL_PROP_START + i] = val sync_engine.update_row_sync_state( row_cells, sync_engine.STATUS_SYNCED, updated_at=child_item.get("updated_at", ""), parent_pn=parent_pn, ) _set_cell_string(sheet, sf.COL_ROW_HASH, row, row_cells[sf.COL_ROW_HASH]) _set_cell_string(sheet, sf.COL_ROW_STATUS, row, row_cells[sf.COL_ROW_STATUS]) _set_cell_string(sheet, sf.COL_UPDATED_AT, row, row_cells[sf.COL_UPDATED_AT]) _set_cell_string(sheet, sf.COL_PARENT_PN, row, row_cells[sf.COL_PARENT_PN]) _set_row_bg(sheet, row, sf.BOM_TOTAL_COLS, _STATUS_COLORS["synced"]) row += 1 # -- Formatting --------------------------------------------------------- _freeze_row(doc, 1) _hide_columns(sheet, sf.COL_PROP_START, sf.COL_PROP_END) # property cols _hide_columns(sheet, sf.COL_SYNC_START, sf.BOM_TOTAL_COLS) # sync cols # Set reasonable column widths for visible columns (in 1/100 mm) _WIDTHS = { sf.COL_ITEM: 4500, sf.COL_LEVEL: 1200, sf.COL_SOURCE: 1500, sf.COL_PN: 2500, sf.COL_DESCRIPTION: 5000, sf.COL_SELLER_DESC: 6000, sf.COL_UNIT_COST: 2200, sf.COL_QTY: 1200, sf.COL_EXT_COST: 2200, sf.COL_SOURCING_LINK: 5000, sf.COL_SCHEMA: 1500, } for col, width in _WIDTHS.items(): _set_column_width(sheet, col, width) # Auto-tag all items with the project (if a project code is set) if project_code: _auto_tag_project(client, all_pns, project_code) return row - 1 # number of data rows written def _compute_subassembly_cost( bom_entries: List[Dict], parent_pn: str, item_cache: Dict[str, Dict], ) -> Optional[float]: """Sum unit_cost * quantity for direct children of parent_pn.""" total = 0.0 found = False for e in bom_entries: if e.get("parent_part_number") == parent_pn and e.get("depth", 0) > 0: q = e.get("quantity") or 0 uc = _get_meta_float(e, "unit_cost") if uc is None: child = item_cache.get(e.get("child_part_number", ""), {}) uc = child.get("standard_cost") if uc is not None: total += float(uc) * float(q) found = True return total if found else None def _build_property_cells( item: Dict, rev: Optional[Dict], bom_entry: Dict ) -> List[str]: """Build the property column values in order matching BOM_PROPERTY_HEADERS. Sources (priority): revision properties > BOM metadata > item fields. """ result = [] for header in sf.BOM_PROPERTY_HEADERS: db_key = sf.PROPERTY_KEY_MAP.get(header, "") val = "" # Check revision properties first if db_key: val = _get_property(rev, db_key) # Fallback to BOM entry metadata if not val and db_key: val = _get_meta(bom_entry, db_key) # Special case: Long Description from item field if header == "Long Description" and not val: val = item.get("long_description", "") # Special case: Notes from item metadata or revision if header == "Notes" and not val: val = _get_meta(bom_entry, "notes") result.append(str(val) if val else "") return result def _auto_tag_project( client: SiloClient, part_numbers: set, project_code: str, ): """Tag all part numbers with the given project code (skip failures).""" for pn in part_numbers: if not pn: continue try: existing = client.get_item_projects(pn) existing_codes = ( {p.get("code", "") for p in existing} if isinstance(existing, list) else set() ) if project_code not in existing_codes: client.add_item_projects(pn, [project_code]) except RuntimeError: pass # Best-effort tagging # --------------------------------------------------------------------------- # SiloPullProject # --------------------------------------------------------------------------- def pull_project( client: SiloClient, doc, project_code: str, schema: str = "kindred-rd", ): """Fetch project items and populate an Items sheet. Also attempts to find an assembly and populate a BOM sheet. """ if not _HAS_UNO: raise RuntimeError("UNO API not available") items = client.get_project_items(project_code) if not items: raise RuntimeError(f"No items found for project {project_code}") sheets = doc.getSheets() # -- Items sheet -------------------------------------------------------- if sheets.hasByName("Items"): items_sheet = sheets.getByName("Items") else: sheets.insertNewByName("Items", sheets.getCount()) items_sheet = sheets.getByName("Items") # Header for col_idx, header in enumerate(sf.ITEMS_HEADERS): _set_cell_string(items_sheet, col_idx, 0, header) header_range = items_sheet.getCellRangeByPosition( 0, 0, len(sf.ITEMS_HEADERS) - 1, 0 ) header_range.CellBackColor = _HEADER_BG header_range.CharColor = _HEADER_FG header_range.CharWeight = 150 for row_idx, item in enumerate(items, start=1): _set_cell_string(items_sheet, 0, row_idx, item.get("part_number", "")) _set_cell_string(items_sheet, 1, row_idx, item.get("description", "")) _set_cell_string(items_sheet, 2, row_idx, item.get("item_type", "")) _set_cell_string(items_sheet, 3, row_idx, item.get("sourcing_type", "")) _set_cell_string(items_sheet, 4, row_idx, schema) cost = item.get("standard_cost") if cost is not None: _set_cell_float(items_sheet, 5, row_idx, cost) _set_cell_string(items_sheet, 6, row_idx, item.get("sourcing_link", "")) _set_cell_string(items_sheet, 7, row_idx, item.get("long_description", "")) # Properties from latest revision (if available) rev = None try: revisions = client.get_revisions(item.get("part_number", "")) if revisions: rev = revisions[0] except RuntimeError: pass prop_cols = [ "manufacturer", "manufacturer_pn", "supplier", "supplier_pn", "lead_time_days", "minimum_order_qty", "lifecycle_status", "rohs_compliant", "country_of_origin", "material", "finish", "notes", ] for pi, prop_key in enumerate(prop_cols): val = _get_property(rev, prop_key) if val: _set_cell_string(items_sheet, 8 + pi, row_idx, val) _set_cell_string( items_sheet, 20, row_idx, item.get("created_at", "")[:10] if item.get("created_at") else "", ) _set_cell_string( items_sheet, 21, row_idx, item.get("updated_at", "")[:10] if item.get("updated_at") else "", ) # Freeze header _freeze_row(doc, 1) # -- BOM sheet (if we can find an assembly) ----------------------------- assemblies = [i for i in items if i.get("item_type") == "assembly"] if assemblies: top_assembly = assemblies[0] top_pn = top_assembly.get("part_number", "") if sheets.hasByName("BOM"): bom_sheet = sheets.getByName("BOM") else: sheets.insertNewByName("BOM", 0) bom_sheet = sheets.getByName("BOM") try: pull_bom( client, doc, bom_sheet, top_pn, project_code=project_code, schema=schema ) except RuntimeError: pass # BOM sheet stays empty if fetch fails return len(items)