feat(storage): add MinIO to filesystem migration tool #137
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,6 +1,7 @@
|
||||
# Binaries
|
||||
/silo
|
||||
/silod
|
||||
/migrate-storage
|
||||
*.exe
|
||||
*.dll
|
||||
*.so
|
||||
|
||||
10
Makefile
10
Makefile
@@ -1,7 +1,8 @@
|
||||
.PHONY: build run test test-integration clean migrate fmt lint \
|
||||
docker-build docker-up docker-down docker-logs docker-ps \
|
||||
docker-clean docker-rebuild \
|
||||
web-install web-dev web-build
|
||||
web-install web-dev web-build \
|
||||
migrate-storage
|
||||
|
||||
# =============================================================================
|
||||
# Local Development
|
||||
@@ -55,6 +56,13 @@ tidy:
|
||||
migrate:
|
||||
./scripts/init-db.sh
|
||||
|
||||
# Build and run MinIO → filesystem migration tool
|
||||
# Usage: make migrate-storage DEST=/opt/silo/data [ARGS="--dry-run --verbose"]
|
||||
migrate-storage:
|
||||
go build -o migrate-storage ./cmd/migrate-storage
|
||||
@echo "Built ./migrate-storage"
|
||||
@echo "Run: ./migrate-storage -config <config.yaml> -dest <dir> [-dry-run] [-verbose]"
|
||||
|
||||
# Connect to database (requires psql)
|
||||
db-shell:
|
||||
PGPASSWORD=$${SILO_DB_PASSWORD:-silodev} psql -h $${SILO_DB_HOST:-localhost} -U $${SILO_DB_USER:-silo} -d $${SILO_DB_NAME:-silo}
|
||||
|
||||
288
cmd/migrate-storage/main.go
Normal file
288
cmd/migrate-storage/main.go
Normal file
@@ -0,0 +1,288 @@
|
||||
// Command migrate-storage downloads files from MinIO and writes them to the
|
||||
// local filesystem. It is a one-shot migration tool for moving off MinIO.
|
||||
//
|
||||
// Usage:
|
||||
//
|
||||
// migrate-storage -config config.yaml -dest /opt/silo/data [-dry-run] [-verbose]
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/kindredsystems/silo/internal/config"
|
||||
"github.com/kindredsystems/silo/internal/db"
|
||||
"github.com/kindredsystems/silo/internal/storage"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// fileEntry represents a single file to migrate.
|
||||
type fileEntry struct {
|
||||
key string
|
||||
versionID string // MinIO version ID; empty if not versioned
|
||||
size int64 // expected size from DB; 0 if unknown
|
||||
}
|
||||
|
||||
func main() {
|
||||
configPath := flag.String("config", "config.yaml", "Path to configuration file")
|
||||
dest := flag.String("dest", "", "Destination root directory (required)")
|
||||
dryRun := flag.Bool("dry-run", false, "Preview what would be migrated without downloading")
|
||||
verbose := flag.Bool("verbose", false, "Log every file, not just errors and summary")
|
||||
flag.Parse()
|
||||
|
||||
logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
|
||||
|
||||
if *dest == "" {
|
||||
logger.Fatal().Msg("-dest is required")
|
||||
}
|
||||
|
||||
// Load config (reuses existing config for DB + MinIO credentials).
|
||||
cfg, err := config.Load(*configPath)
|
||||
if err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to load configuration")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Connect to PostgreSQL.
|
||||
database, err := db.Connect(ctx, db.Config{
|
||||
Host: cfg.Database.Host,
|
||||
Port: cfg.Database.Port,
|
||||
Name: cfg.Database.Name,
|
||||
User: cfg.Database.User,
|
||||
Password: cfg.Database.Password,
|
||||
SSLMode: cfg.Database.SSLMode,
|
||||
MaxConnections: cfg.Database.MaxConnections,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to connect to database")
|
||||
}
|
||||
defer database.Close()
|
||||
logger.Info().Msg("connected to database")
|
||||
|
||||
// Connect to MinIO.
|
||||
store, err := storage.Connect(ctx, storage.Config{
|
||||
Endpoint: cfg.Storage.Endpoint,
|
||||
AccessKey: cfg.Storage.AccessKey,
|
||||
SecretKey: cfg.Storage.SecretKey,
|
||||
Bucket: cfg.Storage.Bucket,
|
||||
UseSSL: cfg.Storage.UseSSL,
|
||||
Region: cfg.Storage.Region,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to connect to MinIO")
|
||||
}
|
||||
logger.Info().Str("bucket", cfg.Storage.Bucket).Msg("connected to MinIO")
|
||||
|
||||
// Collect all file references from the database.
|
||||
entries, err := collectEntries(ctx, logger, database)
|
||||
if err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to collect file entries from database")
|
||||
}
|
||||
logger.Info().Int("total", len(entries)).Msg("file entries found")
|
||||
|
||||
if len(entries) == 0 {
|
||||
logger.Info().Msg("nothing to migrate")
|
||||
return
|
||||
}
|
||||
|
||||
// Migrate.
|
||||
var migrated, skipped, failed int
|
||||
start := time.Now()
|
||||
|
||||
for i, e := range entries {
|
||||
destPath := filepath.Join(*dest, e.key)
|
||||
|
||||
// Check if already migrated.
|
||||
if info, err := os.Stat(destPath); err == nil {
|
||||
if e.size > 0 && info.Size() == e.size {
|
||||
if *verbose {
|
||||
logger.Info().Str("key", e.key).Msg("skipped (already exists)")
|
||||
}
|
||||
skipped++
|
||||
continue
|
||||
}
|
||||
// Size mismatch or unknown size — re-download.
|
||||
}
|
||||
|
||||
if *dryRun {
|
||||
logger.Info().
|
||||
Str("key", e.key).
|
||||
Int64("size", e.size).
|
||||
Str("version", e.versionID).
|
||||
Msgf("[%d/%d] would migrate", i+1, len(entries))
|
||||
continue
|
||||
}
|
||||
|
||||
if err := migrateFile(ctx, store, e, destPath); err != nil {
|
||||
logger.Error().Err(err).Str("key", e.key).Msg("failed to migrate")
|
||||
failed++
|
||||
continue
|
||||
}
|
||||
|
||||
migrated++
|
||||
if *verbose {
|
||||
logger.Info().
|
||||
Str("key", e.key).
|
||||
Int64("size", e.size).
|
||||
Msgf("[%d/%d] migrated", i+1, len(entries))
|
||||
} else if (i+1)%50 == 0 {
|
||||
logger.Info().Msgf("progress: %d/%d", i+1, len(entries))
|
||||
}
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
ev := logger.Info().
|
||||
Int("total", len(entries)).
|
||||
Int("migrated", migrated).
|
||||
Int("skipped", skipped).
|
||||
Int("failed", failed).
|
||||
Dur("elapsed", elapsed)
|
||||
if *dryRun {
|
||||
ev.Msg("dry run complete")
|
||||
} else {
|
||||
ev.Msg("migration complete")
|
||||
}
|
||||
|
||||
if failed > 0 {
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
// collectEntries queries the database for all file references across the three
|
||||
// storage domains: revision files, item file attachments, and item thumbnails.
|
||||
// It deduplicates by key.
|
||||
func collectEntries(ctx context.Context, logger zerolog.Logger, database *db.DB) ([]fileEntry, error) {
|
||||
pool := database.Pool()
|
||||
seen := make(map[string]struct{})
|
||||
var entries []fileEntry
|
||||
|
||||
add := func(key, versionID string, size int64) {
|
||||
if key == "" {
|
||||
return
|
||||
}
|
||||
if _, ok := seen[key]; ok {
|
||||
return
|
||||
}
|
||||
seen[key] = struct{}{}
|
||||
entries = append(entries, fileEntry{key: key, versionID: versionID, size: size})
|
||||
}
|
||||
|
||||
// 1. Revision files.
|
||||
rows, err := pool.Query(ctx,
|
||||
`SELECT file_key, COALESCE(file_version, ''), COALESCE(file_size, 0)
|
||||
FROM revisions WHERE file_key IS NOT NULL`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying revisions: %w", err)
|
||||
}
|
||||
for rows.Next() {
|
||||
var key, version string
|
||||
var size int64
|
||||
if err := rows.Scan(&key, &version, &size); err != nil {
|
||||
rows.Close()
|
||||
return nil, fmt.Errorf("scanning revision row: %w", err)
|
||||
}
|
||||
add(key, version, size)
|
||||
}
|
||||
rows.Close()
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("iterating revisions: %w", err)
|
||||
}
|
||||
logger.Info().Int("count", len(entries)).Msg("revision files found")
|
||||
|
||||
// 2. Item file attachments.
|
||||
countBefore := len(entries)
|
||||
rows, err = pool.Query(ctx,
|
||||
`SELECT object_key, size FROM item_files`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying item_files: %w", err)
|
||||
}
|
||||
for rows.Next() {
|
||||
var key string
|
||||
var size int64
|
||||
if err := rows.Scan(&key, &size); err != nil {
|
||||
rows.Close()
|
||||
return nil, fmt.Errorf("scanning item_files row: %w", err)
|
||||
}
|
||||
add(key, "", size)
|
||||
}
|
||||
rows.Close()
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("iterating item_files: %w", err)
|
||||
}
|
||||
logger.Info().Int("count", len(entries)-countBefore).Msg("item file attachments found")
|
||||
|
||||
// 3. Item thumbnails.
|
||||
countBefore = len(entries)
|
||||
rows, err = pool.Query(ctx,
|
||||
`SELECT thumbnail_key FROM items WHERE thumbnail_key IS NOT NULL`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying item thumbnails: %w", err)
|
||||
}
|
||||
for rows.Next() {
|
||||
var key string
|
||||
if err := rows.Scan(&key); err != nil {
|
||||
rows.Close()
|
||||
return nil, fmt.Errorf("scanning thumbnail row: %w", err)
|
||||
}
|
||||
add(key, "", 0)
|
||||
}
|
||||
rows.Close()
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("iterating thumbnails: %w", err)
|
||||
}
|
||||
logger.Info().Int("count", len(entries)-countBefore).Msg("item thumbnails found")
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// migrateFile downloads a single file from MinIO and writes it atomically to destPath.
|
||||
func migrateFile(ctx context.Context, store *storage.Storage, e fileEntry, destPath string) error {
|
||||
// Ensure parent directory exists.
|
||||
if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil {
|
||||
return fmt.Errorf("creating directory: %w", err)
|
||||
}
|
||||
|
||||
// Download from MinIO.
|
||||
var reader io.ReadCloser
|
||||
var err error
|
||||
if e.versionID != "" {
|
||||
reader, err = store.GetVersion(ctx, e.key, e.versionID)
|
||||
} else {
|
||||
reader, err = store.Get(ctx, e.key)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("downloading from MinIO: %w", err)
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
// Write to temp file then rename for atomicity.
|
||||
tmpPath := destPath + ".tmp"
|
||||
f, err := os.Create(tmpPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating temp file: %w", err)
|
||||
}
|
||||
|
||||
if _, err := io.Copy(f, reader); err != nil {
|
||||
f.Close()
|
||||
os.Remove(tmpPath)
|
||||
return fmt.Errorf("writing file: %w", err)
|
||||
}
|
||||
|
||||
if err := f.Close(); err != nil {
|
||||
os.Remove(tmpPath)
|
||||
return fmt.Errorf("closing temp file: %w", err)
|
||||
}
|
||||
|
||||
if err := os.Rename(tmpPath, destPath); err != nil {
|
||||
os.Remove(tmpPath)
|
||||
return fmt.Errorf("renaming temp file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user