package db import ( "context" "encoding/json" "fmt" "time" "github.com/jackc/pgx/v5" ) // ItemMetadata represents a row in the item_metadata table. type ItemMetadata struct { ItemID string SchemaName *string Tags []string LifecycleState string Fields map[string]any KCVersion *string ManifestUUID *string SiloInstance *string RevisionHash *string UpdatedAt time.Time UpdatedBy *string } // ItemMetadataRepository provides item_metadata database operations. type ItemMetadataRepository struct { db *DB } // NewItemMetadataRepository creates a new item metadata repository. func NewItemMetadataRepository(db *DB) *ItemMetadataRepository { return &ItemMetadataRepository{db: db} } // Get returns metadata for an item, or nil if none exists. func (r *ItemMetadataRepository) Get(ctx context.Context, itemID string) (*ItemMetadata, error) { m := &ItemMetadata{} var fieldsJSON []byte err := r.db.pool.QueryRow(ctx, ` SELECT item_id, schema_name, tags, lifecycle_state, fields, kc_version, manifest_uuid, silo_instance, revision_hash, updated_at, updated_by FROM item_metadata WHERE item_id = $1 `, itemID).Scan( &m.ItemID, &m.SchemaName, &m.Tags, &m.LifecycleState, &fieldsJSON, &m.KCVersion, &m.ManifestUUID, &m.SiloInstance, &m.RevisionHash, &m.UpdatedAt, &m.UpdatedBy, ) if err == pgx.ErrNoRows { return nil, nil } if err != nil { return nil, fmt.Errorf("getting item metadata: %w", err) } if fieldsJSON != nil { if err := json.Unmarshal(fieldsJSON, &m.Fields); err != nil { return nil, fmt.Errorf("unmarshaling fields: %w", err) } } if m.Fields == nil { m.Fields = make(map[string]any) } if m.Tags == nil { m.Tags = []string{} } return m, nil } // Upsert inserts or updates the metadata row for an item. // Used by the commit extraction pipeline. func (r *ItemMetadataRepository) Upsert(ctx context.Context, m *ItemMetadata) error { fieldsJSON, err := json.Marshal(m.Fields) if err != nil { return fmt.Errorf("marshaling fields: %w", err) } _, err = r.db.pool.Exec(ctx, ` INSERT INTO item_metadata (item_id, schema_name, tags, lifecycle_state, fields, kc_version, manifest_uuid, silo_instance, revision_hash, updated_at, updated_by) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, now(), $10) ON CONFLICT (item_id) DO UPDATE SET schema_name = EXCLUDED.schema_name, tags = EXCLUDED.tags, lifecycle_state = EXCLUDED.lifecycle_state, fields = EXCLUDED.fields, kc_version = EXCLUDED.kc_version, manifest_uuid = EXCLUDED.manifest_uuid, silo_instance = EXCLUDED.silo_instance, revision_hash = EXCLUDED.revision_hash, updated_at = now(), updated_by = EXCLUDED.updated_by `, m.ItemID, m.SchemaName, m.Tags, m.LifecycleState, fieldsJSON, m.KCVersion, m.ManifestUUID, m.SiloInstance, m.RevisionHash, m.UpdatedBy) if err != nil { return fmt.Errorf("upserting item metadata: %w", err) } return nil } // UpdateFields merges the given fields into the existing JSONB fields column. func (r *ItemMetadataRepository) UpdateFields(ctx context.Context, itemID string, fields map[string]any, updatedBy string) error { fieldsJSON, err := json.Marshal(fields) if err != nil { return fmt.Errorf("marshaling fields: %w", err) } tag, err := r.db.pool.Exec(ctx, ` UPDATE item_metadata SET fields = fields || $2::jsonb, updated_at = now(), updated_by = $3 WHERE item_id = $1 `, itemID, fieldsJSON, updatedBy) if err != nil { return fmt.Errorf("updating metadata fields: %w", err) } if tag.RowsAffected() == 0 { return fmt.Errorf("item metadata not found") } return nil } // UpdateLifecycle sets the lifecycle_state column. func (r *ItemMetadataRepository) UpdateLifecycle(ctx context.Context, itemID, state, updatedBy string) error { tag, err := r.db.pool.Exec(ctx, ` UPDATE item_metadata SET lifecycle_state = $2, updated_at = now(), updated_by = $3 WHERE item_id = $1 `, itemID, state, updatedBy) if err != nil { return fmt.Errorf("updating lifecycle state: %w", err) } if tag.RowsAffected() == 0 { return fmt.Errorf("item metadata not found") } return nil } // SetTags replaces the tags array. func (r *ItemMetadataRepository) SetTags(ctx context.Context, itemID string, tags []string, updatedBy string) error { tag, err := r.db.pool.Exec(ctx, ` UPDATE item_metadata SET tags = $2, updated_at = now(), updated_by = $3 WHERE item_id = $1 `, itemID, tags, updatedBy) if err != nil { return fmt.Errorf("updating tags: %w", err) } if tag.RowsAffected() == 0 { return fmt.Errorf("item metadata not found") } return nil }