Merge pull request 'feat(storage): add MinIO to filesystem migration tool' (#137) from feat/migrate-storage-tool into main
Reviewed-on: #137
This commit was merged in pull request #137.
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,6 +1,7 @@
|
|||||||
# Binaries
|
# Binaries
|
||||||
/silo
|
/silo
|
||||||
/silod
|
/silod
|
||||||
|
/migrate-storage
|
||||||
*.exe
|
*.exe
|
||||||
*.dll
|
*.dll
|
||||||
*.so
|
*.so
|
||||||
|
|||||||
10
Makefile
10
Makefile
@@ -1,7 +1,8 @@
|
|||||||
.PHONY: build run test test-integration clean migrate fmt lint \
|
.PHONY: build run test test-integration clean migrate fmt lint \
|
||||||
docker-build docker-up docker-down docker-logs docker-ps \
|
docker-build docker-up docker-down docker-logs docker-ps \
|
||||||
docker-clean docker-rebuild \
|
docker-clean docker-rebuild \
|
||||||
web-install web-dev web-build
|
web-install web-dev web-build \
|
||||||
|
migrate-storage
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# Local Development
|
# Local Development
|
||||||
@@ -56,6 +57,13 @@ tidy:
|
|||||||
migrate:
|
migrate:
|
||||||
./scripts/init-db.sh
|
./scripts/init-db.sh
|
||||||
|
|
||||||
|
# Build and run MinIO → filesystem migration tool
|
||||||
|
# Usage: make migrate-storage DEST=/opt/silo/data [ARGS="--dry-run --verbose"]
|
||||||
|
migrate-storage:
|
||||||
|
go build -o migrate-storage ./cmd/migrate-storage
|
||||||
|
@echo "Built ./migrate-storage"
|
||||||
|
@echo "Run: ./migrate-storage -config <config.yaml> -dest <dir> [-dry-run] [-verbose]"
|
||||||
|
|
||||||
# Connect to database (requires psql)
|
# Connect to database (requires psql)
|
||||||
db-shell:
|
db-shell:
|
||||||
PGPASSWORD=$${SILO_DB_PASSWORD:-silodev} psql -h $${SILO_DB_HOST:-localhost} -U $${SILO_DB_USER:-silo} -d $${SILO_DB_NAME:-silo}
|
PGPASSWORD=$${SILO_DB_PASSWORD:-silodev} psql -h $${SILO_DB_HOST:-localhost} -U $${SILO_DB_USER:-silo} -d $${SILO_DB_NAME:-silo}
|
||||||
|
|||||||
288
cmd/migrate-storage/main.go
Normal file
288
cmd/migrate-storage/main.go
Normal file
@@ -0,0 +1,288 @@
|
|||||||
|
// Command migrate-storage downloads files from MinIO and writes them to the
|
||||||
|
// local filesystem. It is a one-shot migration tool for moving off MinIO.
|
||||||
|
//
|
||||||
|
// Usage:
|
||||||
|
//
|
||||||
|
// migrate-storage -config config.yaml -dest /opt/silo/data [-dry-run] [-verbose]
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/kindredsystems/silo/internal/config"
|
||||||
|
"github.com/kindredsystems/silo/internal/db"
|
||||||
|
"github.com/kindredsystems/silo/internal/storage"
|
||||||
|
"github.com/rs/zerolog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// fileEntry represents a single file to migrate.
|
||||||
|
type fileEntry struct {
|
||||||
|
key string
|
||||||
|
versionID string // MinIO version ID; empty if not versioned
|
||||||
|
size int64 // expected size from DB; 0 if unknown
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
configPath := flag.String("config", "config.yaml", "Path to configuration file")
|
||||||
|
dest := flag.String("dest", "", "Destination root directory (required)")
|
||||||
|
dryRun := flag.Bool("dry-run", false, "Preview what would be migrated without downloading")
|
||||||
|
verbose := flag.Bool("verbose", false, "Log every file, not just errors and summary")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
|
||||||
|
|
||||||
|
if *dest == "" {
|
||||||
|
logger.Fatal().Msg("-dest is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load config (reuses existing config for DB + MinIO credentials).
|
||||||
|
cfg, err := config.Load(*configPath)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal().Err(err).Msg("failed to load configuration")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Connect to PostgreSQL.
|
||||||
|
database, err := db.Connect(ctx, db.Config{
|
||||||
|
Host: cfg.Database.Host,
|
||||||
|
Port: cfg.Database.Port,
|
||||||
|
Name: cfg.Database.Name,
|
||||||
|
User: cfg.Database.User,
|
||||||
|
Password: cfg.Database.Password,
|
||||||
|
SSLMode: cfg.Database.SSLMode,
|
||||||
|
MaxConnections: cfg.Database.MaxConnections,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal().Err(err).Msg("failed to connect to database")
|
||||||
|
}
|
||||||
|
defer database.Close()
|
||||||
|
logger.Info().Msg("connected to database")
|
||||||
|
|
||||||
|
// Connect to MinIO.
|
||||||
|
store, err := storage.Connect(ctx, storage.Config{
|
||||||
|
Endpoint: cfg.Storage.Endpoint,
|
||||||
|
AccessKey: cfg.Storage.AccessKey,
|
||||||
|
SecretKey: cfg.Storage.SecretKey,
|
||||||
|
Bucket: cfg.Storage.Bucket,
|
||||||
|
UseSSL: cfg.Storage.UseSSL,
|
||||||
|
Region: cfg.Storage.Region,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal().Err(err).Msg("failed to connect to MinIO")
|
||||||
|
}
|
||||||
|
logger.Info().Str("bucket", cfg.Storage.Bucket).Msg("connected to MinIO")
|
||||||
|
|
||||||
|
// Collect all file references from the database.
|
||||||
|
entries, err := collectEntries(ctx, logger, database)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal().Err(err).Msg("failed to collect file entries from database")
|
||||||
|
}
|
||||||
|
logger.Info().Int("total", len(entries)).Msg("file entries found")
|
||||||
|
|
||||||
|
if len(entries) == 0 {
|
||||||
|
logger.Info().Msg("nothing to migrate")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Migrate.
|
||||||
|
var migrated, skipped, failed int
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
for i, e := range entries {
|
||||||
|
destPath := filepath.Join(*dest, e.key)
|
||||||
|
|
||||||
|
// Check if already migrated.
|
||||||
|
if info, err := os.Stat(destPath); err == nil {
|
||||||
|
if e.size > 0 && info.Size() == e.size {
|
||||||
|
if *verbose {
|
||||||
|
logger.Info().Str("key", e.key).Msg("skipped (already exists)")
|
||||||
|
}
|
||||||
|
skipped++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Size mismatch or unknown size — re-download.
|
||||||
|
}
|
||||||
|
|
||||||
|
if *dryRun {
|
||||||
|
logger.Info().
|
||||||
|
Str("key", e.key).
|
||||||
|
Int64("size", e.size).
|
||||||
|
Str("version", e.versionID).
|
||||||
|
Msgf("[%d/%d] would migrate", i+1, len(entries))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := migrateFile(ctx, store, e, destPath); err != nil {
|
||||||
|
logger.Error().Err(err).Str("key", e.key).Msg("failed to migrate")
|
||||||
|
failed++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
migrated++
|
||||||
|
if *verbose {
|
||||||
|
logger.Info().
|
||||||
|
Str("key", e.key).
|
||||||
|
Int64("size", e.size).
|
||||||
|
Msgf("[%d/%d] migrated", i+1, len(entries))
|
||||||
|
} else if (i+1)%50 == 0 {
|
||||||
|
logger.Info().Msgf("progress: %d/%d", i+1, len(entries))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
ev := logger.Info().
|
||||||
|
Int("total", len(entries)).
|
||||||
|
Int("migrated", migrated).
|
||||||
|
Int("skipped", skipped).
|
||||||
|
Int("failed", failed).
|
||||||
|
Dur("elapsed", elapsed)
|
||||||
|
if *dryRun {
|
||||||
|
ev.Msg("dry run complete")
|
||||||
|
} else {
|
||||||
|
ev.Msg("migration complete")
|
||||||
|
}
|
||||||
|
|
||||||
|
if failed > 0 {
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// collectEntries queries the database for all file references across the three
|
||||||
|
// storage domains: revision files, item file attachments, and item thumbnails.
|
||||||
|
// It deduplicates by key.
|
||||||
|
func collectEntries(ctx context.Context, logger zerolog.Logger, database *db.DB) ([]fileEntry, error) {
|
||||||
|
pool := database.Pool()
|
||||||
|
seen := make(map[string]struct{})
|
||||||
|
var entries []fileEntry
|
||||||
|
|
||||||
|
add := func(key, versionID string, size int64) {
|
||||||
|
if key == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, ok := seen[key]; ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
seen[key] = struct{}{}
|
||||||
|
entries = append(entries, fileEntry{key: key, versionID: versionID, size: size})
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. Revision files.
|
||||||
|
rows, err := pool.Query(ctx,
|
||||||
|
`SELECT file_key, COALESCE(file_version, ''), COALESCE(file_size, 0)
|
||||||
|
FROM revisions WHERE file_key IS NOT NULL`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("querying revisions: %w", err)
|
||||||
|
}
|
||||||
|
for rows.Next() {
|
||||||
|
var key, version string
|
||||||
|
var size int64
|
||||||
|
if err := rows.Scan(&key, &version, &size); err != nil {
|
||||||
|
rows.Close()
|
||||||
|
return nil, fmt.Errorf("scanning revision row: %w", err)
|
||||||
|
}
|
||||||
|
add(key, version, size)
|
||||||
|
}
|
||||||
|
rows.Close()
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, fmt.Errorf("iterating revisions: %w", err)
|
||||||
|
}
|
||||||
|
logger.Info().Int("count", len(entries)).Msg("revision files found")
|
||||||
|
|
||||||
|
// 2. Item file attachments.
|
||||||
|
countBefore := len(entries)
|
||||||
|
rows, err = pool.Query(ctx,
|
||||||
|
`SELECT object_key, size FROM item_files`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("querying item_files: %w", err)
|
||||||
|
}
|
||||||
|
for rows.Next() {
|
||||||
|
var key string
|
||||||
|
var size int64
|
||||||
|
if err := rows.Scan(&key, &size); err != nil {
|
||||||
|
rows.Close()
|
||||||
|
return nil, fmt.Errorf("scanning item_files row: %w", err)
|
||||||
|
}
|
||||||
|
add(key, "", size)
|
||||||
|
}
|
||||||
|
rows.Close()
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, fmt.Errorf("iterating item_files: %w", err)
|
||||||
|
}
|
||||||
|
logger.Info().Int("count", len(entries)-countBefore).Msg("item file attachments found")
|
||||||
|
|
||||||
|
// 3. Item thumbnails.
|
||||||
|
countBefore = len(entries)
|
||||||
|
rows, err = pool.Query(ctx,
|
||||||
|
`SELECT thumbnail_key FROM items WHERE thumbnail_key IS NOT NULL`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("querying item thumbnails: %w", err)
|
||||||
|
}
|
||||||
|
for rows.Next() {
|
||||||
|
var key string
|
||||||
|
if err := rows.Scan(&key); err != nil {
|
||||||
|
rows.Close()
|
||||||
|
return nil, fmt.Errorf("scanning thumbnail row: %w", err)
|
||||||
|
}
|
||||||
|
add(key, "", 0)
|
||||||
|
}
|
||||||
|
rows.Close()
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, fmt.Errorf("iterating thumbnails: %w", err)
|
||||||
|
}
|
||||||
|
logger.Info().Int("count", len(entries)-countBefore).Msg("item thumbnails found")
|
||||||
|
|
||||||
|
return entries, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// migrateFile downloads a single file from MinIO and writes it atomically to destPath.
|
||||||
|
func migrateFile(ctx context.Context, store *storage.Storage, e fileEntry, destPath string) error {
|
||||||
|
// Ensure parent directory exists.
|
||||||
|
if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil {
|
||||||
|
return fmt.Errorf("creating directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Download from MinIO.
|
||||||
|
var reader io.ReadCloser
|
||||||
|
var err error
|
||||||
|
if e.versionID != "" {
|
||||||
|
reader, err = store.GetVersion(ctx, e.key, e.versionID)
|
||||||
|
} else {
|
||||||
|
reader, err = store.Get(ctx, e.key)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("downloading from MinIO: %w", err)
|
||||||
|
}
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
// Write to temp file then rename for atomicity.
|
||||||
|
tmpPath := destPath + ".tmp"
|
||||||
|
f, err := os.Create(tmpPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := io.Copy(f, reader); err != nil {
|
||||||
|
f.Close()
|
||||||
|
os.Remove(tmpPath)
|
||||||
|
return fmt.Errorf("writing file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
os.Remove(tmpPath)
|
||||||
|
return fmt.Errorf("closing temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.Rename(tmpPath, destPath); err != nil {
|
||||||
|
os.Remove(tmpPath)
|
||||||
|
return fmt.Errorf("renaming temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user