diff --git a/.gitignore b/.gitignore index 0cbb933..3bf8233 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # Binaries /silo /silod +/migrate-storage *.exe *.dll *.so diff --git a/Makefile b/Makefile index c7bb968..ac085df 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,8 @@ .PHONY: build run test test-integration clean migrate fmt lint \ docker-build docker-up docker-down docker-logs docker-ps \ docker-clean docker-rebuild \ - web-install web-dev web-build + web-install web-dev web-build \ + migrate-storage # ============================================================================= # Local Development @@ -55,6 +56,13 @@ tidy: migrate: ./scripts/init-db.sh +# Build and run MinIO → filesystem migration tool +# Usage: make migrate-storage DEST=/opt/silo/data [ARGS="--dry-run --verbose"] +migrate-storage: + go build -o migrate-storage ./cmd/migrate-storage + @echo "Built ./migrate-storage" + @echo "Run: ./migrate-storage -config -dest [-dry-run] [-verbose]" + # Connect to database (requires psql) db-shell: PGPASSWORD=$${SILO_DB_PASSWORD:-silodev} psql -h $${SILO_DB_HOST:-localhost} -U $${SILO_DB_USER:-silo} -d $${SILO_DB_NAME:-silo} diff --git a/cmd/migrate-storage/main.go b/cmd/migrate-storage/main.go new file mode 100644 index 0000000..5e64fcf --- /dev/null +++ b/cmd/migrate-storage/main.go @@ -0,0 +1,288 @@ +// Command migrate-storage downloads files from MinIO and writes them to the +// local filesystem. It is a one-shot migration tool for moving off MinIO. +// +// Usage: +// +// migrate-storage -config config.yaml -dest /opt/silo/data [-dry-run] [-verbose] +package main + +import ( + "context" + "flag" + "fmt" + "io" + "os" + "path/filepath" + "time" + + "github.com/kindredsystems/silo/internal/config" + "github.com/kindredsystems/silo/internal/db" + "github.com/kindredsystems/silo/internal/storage" + "github.com/rs/zerolog" +) + +// fileEntry represents a single file to migrate. +type fileEntry struct { + key string + versionID string // MinIO version ID; empty if not versioned + size int64 // expected size from DB; 0 if unknown +} + +func main() { + configPath := flag.String("config", "config.yaml", "Path to configuration file") + dest := flag.String("dest", "", "Destination root directory (required)") + dryRun := flag.Bool("dry-run", false, "Preview what would be migrated without downloading") + verbose := flag.Bool("verbose", false, "Log every file, not just errors and summary") + flag.Parse() + + logger := zerolog.New(os.Stdout).With().Timestamp().Logger() + + if *dest == "" { + logger.Fatal().Msg("-dest is required") + } + + // Load config (reuses existing config for DB + MinIO credentials). + cfg, err := config.Load(*configPath) + if err != nil { + logger.Fatal().Err(err).Msg("failed to load configuration") + } + + ctx := context.Background() + + // Connect to PostgreSQL. + database, err := db.Connect(ctx, db.Config{ + Host: cfg.Database.Host, + Port: cfg.Database.Port, + Name: cfg.Database.Name, + User: cfg.Database.User, + Password: cfg.Database.Password, + SSLMode: cfg.Database.SSLMode, + MaxConnections: cfg.Database.MaxConnections, + }) + if err != nil { + logger.Fatal().Err(err).Msg("failed to connect to database") + } + defer database.Close() + logger.Info().Msg("connected to database") + + // Connect to MinIO. + store, err := storage.Connect(ctx, storage.Config{ + Endpoint: cfg.Storage.Endpoint, + AccessKey: cfg.Storage.AccessKey, + SecretKey: cfg.Storage.SecretKey, + Bucket: cfg.Storage.Bucket, + UseSSL: cfg.Storage.UseSSL, + Region: cfg.Storage.Region, + }) + if err != nil { + logger.Fatal().Err(err).Msg("failed to connect to MinIO") + } + logger.Info().Str("bucket", cfg.Storage.Bucket).Msg("connected to MinIO") + + // Collect all file references from the database. + entries, err := collectEntries(ctx, logger, database) + if err != nil { + logger.Fatal().Err(err).Msg("failed to collect file entries from database") + } + logger.Info().Int("total", len(entries)).Msg("file entries found") + + if len(entries) == 0 { + logger.Info().Msg("nothing to migrate") + return + } + + // Migrate. + var migrated, skipped, failed int + start := time.Now() + + for i, e := range entries { + destPath := filepath.Join(*dest, e.key) + + // Check if already migrated. + if info, err := os.Stat(destPath); err == nil { + if e.size > 0 && info.Size() == e.size { + if *verbose { + logger.Info().Str("key", e.key).Msg("skipped (already exists)") + } + skipped++ + continue + } + // Size mismatch or unknown size — re-download. + } + + if *dryRun { + logger.Info(). + Str("key", e.key). + Int64("size", e.size). + Str("version", e.versionID). + Msgf("[%d/%d] would migrate", i+1, len(entries)) + continue + } + + if err := migrateFile(ctx, store, e, destPath); err != nil { + logger.Error().Err(err).Str("key", e.key).Msg("failed to migrate") + failed++ + continue + } + + migrated++ + if *verbose { + logger.Info(). + Str("key", e.key). + Int64("size", e.size). + Msgf("[%d/%d] migrated", i+1, len(entries)) + } else if (i+1)%50 == 0 { + logger.Info().Msgf("progress: %d/%d", i+1, len(entries)) + } + } + + elapsed := time.Since(start) + ev := logger.Info(). + Int("total", len(entries)). + Int("migrated", migrated). + Int("skipped", skipped). + Int("failed", failed). + Dur("elapsed", elapsed) + if *dryRun { + ev.Msg("dry run complete") + } else { + ev.Msg("migration complete") + } + + if failed > 0 { + os.Exit(1) + } +} + +// collectEntries queries the database for all file references across the three +// storage domains: revision files, item file attachments, and item thumbnails. +// It deduplicates by key. +func collectEntries(ctx context.Context, logger zerolog.Logger, database *db.DB) ([]fileEntry, error) { + pool := database.Pool() + seen := make(map[string]struct{}) + var entries []fileEntry + + add := func(key, versionID string, size int64) { + if key == "" { + return + } + if _, ok := seen[key]; ok { + return + } + seen[key] = struct{}{} + entries = append(entries, fileEntry{key: key, versionID: versionID, size: size}) + } + + // 1. Revision files. + rows, err := pool.Query(ctx, + `SELECT file_key, COALESCE(file_version, ''), COALESCE(file_size, 0) + FROM revisions WHERE file_key IS NOT NULL`) + if err != nil { + return nil, fmt.Errorf("querying revisions: %w", err) + } + for rows.Next() { + var key, version string + var size int64 + if err := rows.Scan(&key, &version, &size); err != nil { + rows.Close() + return nil, fmt.Errorf("scanning revision row: %w", err) + } + add(key, version, size) + } + rows.Close() + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterating revisions: %w", err) + } + logger.Info().Int("count", len(entries)).Msg("revision files found") + + // 2. Item file attachments. + countBefore := len(entries) + rows, err = pool.Query(ctx, + `SELECT object_key, size FROM item_files`) + if err != nil { + return nil, fmt.Errorf("querying item_files: %w", err) + } + for rows.Next() { + var key string + var size int64 + if err := rows.Scan(&key, &size); err != nil { + rows.Close() + return nil, fmt.Errorf("scanning item_files row: %w", err) + } + add(key, "", size) + } + rows.Close() + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterating item_files: %w", err) + } + logger.Info().Int("count", len(entries)-countBefore).Msg("item file attachments found") + + // 3. Item thumbnails. + countBefore = len(entries) + rows, err = pool.Query(ctx, + `SELECT thumbnail_key FROM items WHERE thumbnail_key IS NOT NULL`) + if err != nil { + return nil, fmt.Errorf("querying item thumbnails: %w", err) + } + for rows.Next() { + var key string + if err := rows.Scan(&key); err != nil { + rows.Close() + return nil, fmt.Errorf("scanning thumbnail row: %w", err) + } + add(key, "", 0) + } + rows.Close() + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterating thumbnails: %w", err) + } + logger.Info().Int("count", len(entries)-countBefore).Msg("item thumbnails found") + + return entries, nil +} + +// migrateFile downloads a single file from MinIO and writes it atomically to destPath. +func migrateFile(ctx context.Context, store *storage.Storage, e fileEntry, destPath string) error { + // Ensure parent directory exists. + if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil { + return fmt.Errorf("creating directory: %w", err) + } + + // Download from MinIO. + var reader io.ReadCloser + var err error + if e.versionID != "" { + reader, err = store.GetVersion(ctx, e.key, e.versionID) + } else { + reader, err = store.Get(ctx, e.key) + } + if err != nil { + return fmt.Errorf("downloading from MinIO: %w", err) + } + defer reader.Close() + + // Write to temp file then rename for atomicity. + tmpPath := destPath + ".tmp" + f, err := os.Create(tmpPath) + if err != nil { + return fmt.Errorf("creating temp file: %w", err) + } + + if _, err := io.Copy(f, reader); err != nil { + f.Close() + os.Remove(tmpPath) + return fmt.Errorf("writing file: %w", err) + } + + if err := f.Close(); err != nil { + os.Remove(tmpPath) + return fmt.Errorf("closing temp file: %w", err) + } + + if err := os.Rename(tmpPath, destPath); err != nil { + os.Remove(tmpPath) + return fmt.Errorf("renaming temp file: %w", err) + } + + return nil +}