Files
silo/internal/migration/properties.go
2026-01-24 15:03:17 -06:00

212 lines
6.3 KiB
Go

// Package migration provides property schema migration functionality.
package migration
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/kindredsystems/silo/internal/db"
"github.com/kindredsystems/silo/internal/schema"
"github.com/rs/zerolog"
)
// PropertyMigrator handles property schema migrations.
type PropertyMigrator struct {
db *db.DB
schema *schema.Schema
logger zerolog.Logger
}
// NewPropertyMigrator creates a new property migrator.
func NewPropertyMigrator(database *db.DB, sch *schema.Schema, logger zerolog.Logger) *PropertyMigrator {
return &PropertyMigrator{
db: database,
schema: sch,
logger: logger,
}
}
// MigrationResult contains the results of a migration run.
type MigrationResult struct {
MigrationID string
FromVersion int
ToVersion int
ItemsAffected int
StartedAt time.Time
CompletedAt time.Time
Status string
Error error
}
// MigrateToVersion migrates all items to the specified property schema version.
// This creates new revisions for items that have outdated property schemas.
func (m *PropertyMigrator) MigrateToVersion(ctx context.Context, targetVersion int) (*MigrationResult, error) {
if m.schema.PropertySchemas == nil {
return nil, fmt.Errorf("no property schema defined for %s", m.schema.Name)
}
result := &MigrationResult{
FromVersion: 0, // Will be updated based on what we find
ToVersion: targetVersion,
StartedAt: time.Now(),
Status: "running",
}
// Record migration start
var migrationID string
err := m.db.Pool().QueryRow(ctx, `
INSERT INTO property_migrations (schema_name, from_version, to_version, status)
VALUES ($1, $2, $3, 'running')
RETURNING id
`, m.schema.Name, 0, targetVersion).Scan(&migrationID)
if err != nil {
return nil, fmt.Errorf("recording migration start: %w", err)
}
result.MigrationID = migrationID
// Find items needing migration
rows, err := m.db.Pool().Query(ctx, `
SELECT DISTINCT i.id, i.part_number, r.properties, r.property_schema_version
FROM items i
JOIN revisions r ON r.item_id = i.id AND r.revision_number = i.current_revision
WHERE i.archived_at IS NULL
AND (r.property_schema_version < $1 OR r.property_schema_version IS NULL)
`, targetVersion)
if err != nil {
m.recordMigrationError(ctx, migrationID, err)
return nil, fmt.Errorf("querying items: %w", err)
}
defer rows.Close()
type itemToMigrate struct {
ID string
PartNumber string
Properties map[string]any
Version *int
}
var items []itemToMigrate
for rows.Next() {
var item itemToMigrate
var propsJSON []byte
if err := rows.Scan(&item.ID, &item.PartNumber, &propsJSON, &item.Version); err != nil {
m.recordMigrationError(ctx, migrationID, err)
return nil, fmt.Errorf("scanning row: %w", err)
}
if len(propsJSON) > 0 {
if err := json.Unmarshal(propsJSON, &item.Properties); err != nil {
m.logger.Warn().Err(err).Str("part_number", item.PartNumber).Msg("failed to parse properties")
item.Properties = make(map[string]any)
}
} else {
item.Properties = make(map[string]any)
}
items = append(items, item)
}
if err := rows.Err(); err != nil {
m.recordMigrationError(ctx, migrationID, err)
return nil, fmt.Errorf("iterating rows: %w", err)
}
m.logger.Info().Int("count", len(items)).Int("target_version", targetVersion).Msg("migrating items")
// Migrate each item
for _, item := range items {
if err := m.migrateItem(ctx, item.ID, item.PartNumber, item.Properties, targetVersion); err != nil {
m.logger.Error().Err(err).Str("part_number", item.PartNumber).Msg("failed to migrate item")
// Continue with other items
} else {
result.ItemsAffected++
}
}
// Record success
result.CompletedAt = time.Now()
result.Status = "completed"
_, err = m.db.Pool().Exec(ctx, `
UPDATE property_migrations
SET completed_at = $1, status = 'completed', items_affected = $2
WHERE id = $3
`, result.CompletedAt, result.ItemsAffected, migrationID)
if err != nil {
m.logger.Warn().Err(err).Msg("failed to update migration record")
}
return result, nil
}
// migrateItem applies property defaults and creates a new revision.
func (m *PropertyMigrator) migrateItem(ctx context.Context, itemID, partNumber string, properties map[string]any, targetVersion int) error {
// Get category from properties
category, _ := properties["category"].(string)
// Apply defaults for missing properties
newProps := m.schema.PropertySchemas.ApplyDefaults(properties, category)
propsJSON, err := json.Marshal(newProps)
if err != nil {
return fmt.Errorf("marshaling properties: %w", err)
}
// Create new revision with updated properties
comment := fmt.Sprintf("Property schema migration to version %d", targetVersion)
_, err = m.db.Pool().Exec(ctx, `
INSERT INTO revisions (item_id, revision_number, properties, property_schema_version, comment)
SELECT $1, current_revision + 1, $2, $3, $4
FROM items WHERE id = $1
`, itemID, propsJSON, targetVersion, comment)
if err != nil {
return fmt.Errorf("creating revision: %w", err)
}
m.logger.Debug().Str("part_number", partNumber).Int("version", targetVersion).Msg("migrated item")
return nil
}
func (m *PropertyMigrator) recordMigrationError(ctx context.Context, migrationID string, err error) {
_, dbErr := m.db.Pool().Exec(ctx, `
UPDATE property_migrations
SET completed_at = now(), status = 'failed', error_message = $1
WHERE id = $2
`, err.Error(), migrationID)
if dbErr != nil {
m.logger.Warn().Err(dbErr).Msg("failed to record migration error")
}
}
// GetMigrationHistory returns recent property migrations.
func (m *PropertyMigrator) GetMigrationHistory(ctx context.Context, limit int) ([]MigrationResult, error) {
rows, err := m.db.Pool().Query(ctx, `
SELECT id, from_version, to_version, items_affected, started_at,
COALESCE(completed_at, now()), status, error_message
FROM property_migrations
WHERE schema_name = $1
ORDER BY started_at DESC
LIMIT $2
`, m.schema.Name, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var results []MigrationResult
for rows.Next() {
var r MigrationResult
var errMsg *string
if err := rows.Scan(&r.MigrationID, &r.FromVersion, &r.ToVersion,
&r.ItemsAffected, &r.StartedAt, &r.CompletedAt, &r.Status, &errMsg); err != nil {
return nil, err
}
if errMsg != nil {
r.Error = fmt.Errorf("%s", *errMsg)
}
results = append(results, r)
}
return results, rows.Err()
}