212 lines
6.3 KiB
Go
212 lines
6.3 KiB
Go
// Package migration provides property schema migration functionality.
|
|
package migration
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/kindredsystems/silo/internal/db"
|
|
"github.com/kindredsystems/silo/internal/schema"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
// PropertyMigrator handles property schema migrations.
|
|
type PropertyMigrator struct {
|
|
db *db.DB
|
|
schema *schema.Schema
|
|
logger zerolog.Logger
|
|
}
|
|
|
|
// NewPropertyMigrator creates a new property migrator.
|
|
func NewPropertyMigrator(database *db.DB, sch *schema.Schema, logger zerolog.Logger) *PropertyMigrator {
|
|
return &PropertyMigrator{
|
|
db: database,
|
|
schema: sch,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// MigrationResult contains the results of a migration run.
|
|
type MigrationResult struct {
|
|
MigrationID string
|
|
FromVersion int
|
|
ToVersion int
|
|
ItemsAffected int
|
|
StartedAt time.Time
|
|
CompletedAt time.Time
|
|
Status string
|
|
Error error
|
|
}
|
|
|
|
// MigrateToVersion migrates all items to the specified property schema version.
|
|
// This creates new revisions for items that have outdated property schemas.
|
|
func (m *PropertyMigrator) MigrateToVersion(ctx context.Context, targetVersion int) (*MigrationResult, error) {
|
|
if m.schema.PropertySchemas == nil {
|
|
return nil, fmt.Errorf("no property schema defined for %s", m.schema.Name)
|
|
}
|
|
|
|
result := &MigrationResult{
|
|
FromVersion: 0, // Will be updated based on what we find
|
|
ToVersion: targetVersion,
|
|
StartedAt: time.Now(),
|
|
Status: "running",
|
|
}
|
|
|
|
// Record migration start
|
|
var migrationID string
|
|
err := m.db.Pool().QueryRow(ctx, `
|
|
INSERT INTO property_migrations (schema_name, from_version, to_version, status)
|
|
VALUES ($1, $2, $3, 'running')
|
|
RETURNING id
|
|
`, m.schema.Name, 0, targetVersion).Scan(&migrationID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("recording migration start: %w", err)
|
|
}
|
|
result.MigrationID = migrationID
|
|
|
|
// Find items needing migration
|
|
rows, err := m.db.Pool().Query(ctx, `
|
|
SELECT DISTINCT i.id, i.part_number, r.properties, r.property_schema_version
|
|
FROM items i
|
|
JOIN revisions r ON r.item_id = i.id AND r.revision_number = i.current_revision
|
|
WHERE i.archived_at IS NULL
|
|
AND (r.property_schema_version < $1 OR r.property_schema_version IS NULL)
|
|
`, targetVersion)
|
|
if err != nil {
|
|
m.recordMigrationError(ctx, migrationID, err)
|
|
return nil, fmt.Errorf("querying items: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
type itemToMigrate struct {
|
|
ID string
|
|
PartNumber string
|
|
Properties map[string]any
|
|
Version *int
|
|
}
|
|
|
|
var items []itemToMigrate
|
|
for rows.Next() {
|
|
var item itemToMigrate
|
|
var propsJSON []byte
|
|
if err := rows.Scan(&item.ID, &item.PartNumber, &propsJSON, &item.Version); err != nil {
|
|
m.recordMigrationError(ctx, migrationID, err)
|
|
return nil, fmt.Errorf("scanning row: %w", err)
|
|
}
|
|
if len(propsJSON) > 0 {
|
|
if err := json.Unmarshal(propsJSON, &item.Properties); err != nil {
|
|
m.logger.Warn().Err(err).Str("part_number", item.PartNumber).Msg("failed to parse properties")
|
|
item.Properties = make(map[string]any)
|
|
}
|
|
} else {
|
|
item.Properties = make(map[string]any)
|
|
}
|
|
items = append(items, item)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
m.recordMigrationError(ctx, migrationID, err)
|
|
return nil, fmt.Errorf("iterating rows: %w", err)
|
|
}
|
|
|
|
m.logger.Info().Int("count", len(items)).Int("target_version", targetVersion).Msg("migrating items")
|
|
|
|
// Migrate each item
|
|
for _, item := range items {
|
|
if err := m.migrateItem(ctx, item.ID, item.PartNumber, item.Properties, targetVersion); err != nil {
|
|
m.logger.Error().Err(err).Str("part_number", item.PartNumber).Msg("failed to migrate item")
|
|
// Continue with other items
|
|
} else {
|
|
result.ItemsAffected++
|
|
}
|
|
}
|
|
|
|
// Record success
|
|
result.CompletedAt = time.Now()
|
|
result.Status = "completed"
|
|
|
|
_, err = m.db.Pool().Exec(ctx, `
|
|
UPDATE property_migrations
|
|
SET completed_at = $1, status = 'completed', items_affected = $2
|
|
WHERE id = $3
|
|
`, result.CompletedAt, result.ItemsAffected, migrationID)
|
|
if err != nil {
|
|
m.logger.Warn().Err(err).Msg("failed to update migration record")
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// migrateItem applies property defaults and creates a new revision.
|
|
func (m *PropertyMigrator) migrateItem(ctx context.Context, itemID, partNumber string, properties map[string]any, targetVersion int) error {
|
|
// Get category from properties
|
|
category, _ := properties["category"].(string)
|
|
|
|
// Apply defaults for missing properties
|
|
newProps := m.schema.PropertySchemas.ApplyDefaults(properties, category)
|
|
|
|
propsJSON, err := json.Marshal(newProps)
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling properties: %w", err)
|
|
}
|
|
|
|
// Create new revision with updated properties
|
|
comment := fmt.Sprintf("Property schema migration to version %d", targetVersion)
|
|
_, err = m.db.Pool().Exec(ctx, `
|
|
INSERT INTO revisions (item_id, revision_number, properties, property_schema_version, comment)
|
|
SELECT $1, current_revision + 1, $2, $3, $4
|
|
FROM items WHERE id = $1
|
|
`, itemID, propsJSON, targetVersion, comment)
|
|
if err != nil {
|
|
return fmt.Errorf("creating revision: %w", err)
|
|
}
|
|
|
|
m.logger.Debug().Str("part_number", partNumber).Int("version", targetVersion).Msg("migrated item")
|
|
return nil
|
|
}
|
|
|
|
func (m *PropertyMigrator) recordMigrationError(ctx context.Context, migrationID string, err error) {
|
|
_, dbErr := m.db.Pool().Exec(ctx, `
|
|
UPDATE property_migrations
|
|
SET completed_at = now(), status = 'failed', error_message = $1
|
|
WHERE id = $2
|
|
`, err.Error(), migrationID)
|
|
if dbErr != nil {
|
|
m.logger.Warn().Err(dbErr).Msg("failed to record migration error")
|
|
}
|
|
}
|
|
|
|
// GetMigrationHistory returns recent property migrations.
|
|
func (m *PropertyMigrator) GetMigrationHistory(ctx context.Context, limit int) ([]MigrationResult, error) {
|
|
rows, err := m.db.Pool().Query(ctx, `
|
|
SELECT id, from_version, to_version, items_affected, started_at,
|
|
COALESCE(completed_at, now()), status, error_message
|
|
FROM property_migrations
|
|
WHERE schema_name = $1
|
|
ORDER BY started_at DESC
|
|
LIMIT $2
|
|
`, m.schema.Name, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var results []MigrationResult
|
|
for rows.Next() {
|
|
var r MigrationResult
|
|
var errMsg *string
|
|
if err := rows.Scan(&r.MigrationID, &r.FromVersion, &r.ToVersion,
|
|
&r.ItemsAffected, &r.StartedAt, &r.CompletedAt, &r.Status, &errMsg); err != nil {
|
|
return nil, err
|
|
}
|
|
if errMsg != nil {
|
|
r.Error = fmt.Errorf("%s", *errMsg)
|
|
}
|
|
results = append(results, r)
|
|
}
|
|
|
|
return results, rows.Err()
|
|
}
|