Implements issue #141 — .kc server-side metadata integration Phase 1. When a .kc file is uploaded, the server extracts silo/manifest.json and silo/metadata.json from the ZIP archive and indexes them into the item_metadata table. Plain .fcstd files continue to work unchanged. Extraction is best-effort: failures are logged but do not block the upload. New packages: - internal/kc: ZIP extraction library (Extract, Manifest, Metadata types) - internal/db: ItemMetadataRepository (Get, Upsert, UpdateFields, UpdateLifecycle, SetTags) New API endpoints under /api/items/{partNumber}: - GET /metadata — read indexed metadata (viewer) - PUT /metadata — merge fields into JSONB (editor) - PATCH /metadata/lifecycle — transition lifecycle state (editor) - PATCH /metadata/tags — add/remove tags (editor) SSE events: metadata.updated, metadata.lifecycle, metadata.tags Lifecycle transitions (Phase 1): draft→review→released→obsolete, review→draft (reject). Closes #141
162 lines
4.6 KiB
Go
162 lines
4.6 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
// ItemMetadata represents a row in the item_metadata table.
|
|
type ItemMetadata struct {
|
|
ItemID string
|
|
SchemaName *string
|
|
Tags []string
|
|
LifecycleState string
|
|
Fields map[string]any
|
|
KCVersion *string
|
|
ManifestUUID *string
|
|
SiloInstance *string
|
|
RevisionHash *string
|
|
UpdatedAt time.Time
|
|
UpdatedBy *string
|
|
}
|
|
|
|
// ItemMetadataRepository provides item_metadata database operations.
|
|
type ItemMetadataRepository struct {
|
|
db *DB
|
|
}
|
|
|
|
// NewItemMetadataRepository creates a new item metadata repository.
|
|
func NewItemMetadataRepository(db *DB) *ItemMetadataRepository {
|
|
return &ItemMetadataRepository{db: db}
|
|
}
|
|
|
|
// Get returns metadata for an item, or nil if none exists.
|
|
func (r *ItemMetadataRepository) Get(ctx context.Context, itemID string) (*ItemMetadata, error) {
|
|
m := &ItemMetadata{}
|
|
var fieldsJSON []byte
|
|
err := r.db.pool.QueryRow(ctx, `
|
|
SELECT item_id, schema_name, tags, lifecycle_state, fields,
|
|
kc_version, manifest_uuid, silo_instance, revision_hash,
|
|
updated_at, updated_by
|
|
FROM item_metadata
|
|
WHERE item_id = $1
|
|
`, itemID).Scan(
|
|
&m.ItemID, &m.SchemaName, &m.Tags, &m.LifecycleState, &fieldsJSON,
|
|
&m.KCVersion, &m.ManifestUUID, &m.SiloInstance, &m.RevisionHash,
|
|
&m.UpdatedAt, &m.UpdatedBy,
|
|
)
|
|
if err == pgx.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting item metadata: %w", err)
|
|
}
|
|
if fieldsJSON != nil {
|
|
if err := json.Unmarshal(fieldsJSON, &m.Fields); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling fields: %w", err)
|
|
}
|
|
}
|
|
if m.Fields == nil {
|
|
m.Fields = make(map[string]any)
|
|
}
|
|
if m.Tags == nil {
|
|
m.Tags = []string{}
|
|
}
|
|
return m, nil
|
|
}
|
|
|
|
// Upsert inserts or updates the metadata row for an item.
|
|
// Used by the commit extraction pipeline.
|
|
func (r *ItemMetadataRepository) Upsert(ctx context.Context, m *ItemMetadata) error {
|
|
fieldsJSON, err := json.Marshal(m.Fields)
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling fields: %w", err)
|
|
}
|
|
_, err = r.db.pool.Exec(ctx, `
|
|
INSERT INTO item_metadata
|
|
(item_id, schema_name, tags, lifecycle_state, fields,
|
|
kc_version, manifest_uuid, silo_instance, revision_hash,
|
|
updated_at, updated_by)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, now(), $10)
|
|
ON CONFLICT (item_id) DO UPDATE SET
|
|
schema_name = EXCLUDED.schema_name,
|
|
tags = EXCLUDED.tags,
|
|
lifecycle_state = EXCLUDED.lifecycle_state,
|
|
fields = EXCLUDED.fields,
|
|
kc_version = EXCLUDED.kc_version,
|
|
manifest_uuid = EXCLUDED.manifest_uuid,
|
|
silo_instance = EXCLUDED.silo_instance,
|
|
revision_hash = EXCLUDED.revision_hash,
|
|
updated_at = now(),
|
|
updated_by = EXCLUDED.updated_by
|
|
`, m.ItemID, m.SchemaName, m.Tags, m.LifecycleState, fieldsJSON,
|
|
m.KCVersion, m.ManifestUUID, m.SiloInstance, m.RevisionHash,
|
|
m.UpdatedBy)
|
|
if err != nil {
|
|
return fmt.Errorf("upserting item metadata: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateFields merges the given fields into the existing JSONB fields column.
|
|
func (r *ItemMetadataRepository) UpdateFields(ctx context.Context, itemID string, fields map[string]any, updatedBy string) error {
|
|
fieldsJSON, err := json.Marshal(fields)
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling fields: %w", err)
|
|
}
|
|
tag, err := r.db.pool.Exec(ctx, `
|
|
UPDATE item_metadata
|
|
SET fields = fields || $2::jsonb,
|
|
updated_at = now(),
|
|
updated_by = $3
|
|
WHERE item_id = $1
|
|
`, itemID, fieldsJSON, updatedBy)
|
|
if err != nil {
|
|
return fmt.Errorf("updating metadata fields: %w", err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return fmt.Errorf("item metadata not found")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateLifecycle sets the lifecycle_state column.
|
|
func (r *ItemMetadataRepository) UpdateLifecycle(ctx context.Context, itemID, state, updatedBy string) error {
|
|
tag, err := r.db.pool.Exec(ctx, `
|
|
UPDATE item_metadata
|
|
SET lifecycle_state = $2,
|
|
updated_at = now(),
|
|
updated_by = $3
|
|
WHERE item_id = $1
|
|
`, itemID, state, updatedBy)
|
|
if err != nil {
|
|
return fmt.Errorf("updating lifecycle state: %w", err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return fmt.Errorf("item metadata not found")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetTags replaces the tags array.
|
|
func (r *ItemMetadataRepository) SetTags(ctx context.Context, itemID string, tags []string, updatedBy string) error {
|
|
tag, err := r.db.pool.Exec(ctx, `
|
|
UPDATE item_metadata
|
|
SET tags = $2,
|
|
updated_at = now(),
|
|
updated_by = $3
|
|
WHERE item_id = $1
|
|
`, itemID, tags, updatedBy)
|
|
if err != nil {
|
|
return fmt.Errorf("updating tags: %w", err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return fmt.Errorf("item metadata not found")
|
|
}
|
|
return nil
|
|
}
|