Merge pull request 'feat(storage): FileStore interface abstraction + filesystem backend' (#134) from feat-storage-interface-filesystem into main
Reviewed-on: #134
This commit was merged in pull request #134.
This commit is contained in:
@@ -65,24 +65,39 @@ func main() {
|
|||||||
logger.Info().Msg("connected to database")
|
logger.Info().Msg("connected to database")
|
||||||
|
|
||||||
// Connect to storage (optional - may be externally managed)
|
// Connect to storage (optional - may be externally managed)
|
||||||
var store *storage.Storage
|
var store storage.FileStore
|
||||||
if cfg.Storage.Endpoint != "" {
|
switch cfg.Storage.Backend {
|
||||||
store, err = storage.Connect(ctx, storage.Config{
|
case "minio", "":
|
||||||
Endpoint: cfg.Storage.Endpoint,
|
if cfg.Storage.Endpoint != "" {
|
||||||
AccessKey: cfg.Storage.AccessKey,
|
s, connErr := storage.Connect(ctx, storage.Config{
|
||||||
SecretKey: cfg.Storage.SecretKey,
|
Endpoint: cfg.Storage.Endpoint,
|
||||||
Bucket: cfg.Storage.Bucket,
|
AccessKey: cfg.Storage.AccessKey,
|
||||||
UseSSL: cfg.Storage.UseSSL,
|
SecretKey: cfg.Storage.SecretKey,
|
||||||
Region: cfg.Storage.Region,
|
Bucket: cfg.Storage.Bucket,
|
||||||
})
|
UseSSL: cfg.Storage.UseSSL,
|
||||||
if err != nil {
|
Region: cfg.Storage.Region,
|
||||||
logger.Warn().Err(err).Msg("failed to connect to storage - file operations disabled")
|
})
|
||||||
store = nil
|
if connErr != nil {
|
||||||
|
logger.Warn().Err(connErr).Msg("failed to connect to storage - file operations disabled")
|
||||||
|
} else {
|
||||||
|
store = s
|
||||||
|
logger.Info().Msg("connected to storage")
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.Info().Msg("connected to storage")
|
logger.Info().Msg("storage not configured - file operations disabled")
|
||||||
}
|
}
|
||||||
} else {
|
case "filesystem":
|
||||||
logger.Info().Msg("storage not configured - file operations disabled")
|
if cfg.Storage.Filesystem.RootDir == "" {
|
||||||
|
logger.Fatal().Msg("storage.filesystem.root_dir is required when backend is \"filesystem\"")
|
||||||
|
}
|
||||||
|
s, fsErr := storage.NewFilesystemStore(cfg.Storage.Filesystem.RootDir)
|
||||||
|
if fsErr != nil {
|
||||||
|
logger.Fatal().Err(fsErr).Msg("failed to initialize filesystem storage")
|
||||||
|
}
|
||||||
|
store = s
|
||||||
|
logger.Info().Str("root", cfg.Storage.Filesystem.RootDir).Msg("connected to filesystem storage")
|
||||||
|
default:
|
||||||
|
logger.Fatal().Str("backend", cfg.Storage.Backend).Msg("unknown storage backend")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load schemas
|
// Load schemas
|
||||||
|
|||||||
@@ -17,12 +17,17 @@ database:
|
|||||||
max_connections: 10
|
max_connections: 10
|
||||||
|
|
||||||
storage:
|
storage:
|
||||||
|
backend: "minio" # "minio" (default) or "filesystem"
|
||||||
|
# MinIO/S3 settings (used when backend: "minio")
|
||||||
endpoint: "localhost:9000" # Use "minio:9000" for Docker Compose
|
endpoint: "localhost:9000" # Use "minio:9000" for Docker Compose
|
||||||
access_key: "" # Use SILO_MINIO_ACCESS_KEY env var
|
access_key: "" # Use SILO_MINIO_ACCESS_KEY env var
|
||||||
secret_key: "" # Use SILO_MINIO_SECRET_KEY env var
|
secret_key: "" # Use SILO_MINIO_SECRET_KEY env var
|
||||||
bucket: "silo-files"
|
bucket: "silo-files"
|
||||||
use_ssl: true # Use false for Docker Compose (internal network)
|
use_ssl: true # Use false for Docker Compose (internal network)
|
||||||
region: "us-east-1"
|
region: "us-east-1"
|
||||||
|
# Filesystem settings (used when backend: "filesystem")
|
||||||
|
# filesystem:
|
||||||
|
# root_dir: "/var/lib/silo/objects"
|
||||||
|
|
||||||
schemas:
|
schemas:
|
||||||
# Directory containing YAML schema files
|
# Directory containing YAML schema files
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ type Server struct {
|
|||||||
schemas map[string]*schema.Schema
|
schemas map[string]*schema.Schema
|
||||||
schemasDir string
|
schemasDir string
|
||||||
partgen *partnum.Generator
|
partgen *partnum.Generator
|
||||||
storage *storage.Storage
|
storage storage.FileStore
|
||||||
auth *auth.Service
|
auth *auth.Service
|
||||||
sessions *scs.SessionManager
|
sessions *scs.SessionManager
|
||||||
oidc *auth.OIDCBackend
|
oidc *auth.OIDCBackend
|
||||||
@@ -61,7 +61,7 @@ func NewServer(
|
|||||||
database *db.DB,
|
database *db.DB,
|
||||||
schemas map[string]*schema.Schema,
|
schemas map[string]*schema.Schema,
|
||||||
schemasDir string,
|
schemasDir string,
|
||||||
store *storage.Storage,
|
store storage.FileStore,
|
||||||
authService *auth.Service,
|
authService *auth.Service,
|
||||||
sessionManager *scs.SessionManager,
|
sessionManager *scs.SessionManager,
|
||||||
oidcBackend *auth.OIDCBackend,
|
oidcBackend *auth.OIDCBackend,
|
||||||
|
|||||||
@@ -26,13 +26,13 @@ type ServerState struct {
|
|||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
readOnly bool
|
readOnly bool
|
||||||
storageOK bool
|
storageOK bool
|
||||||
storage *storage.Storage
|
storage storage.FileStore
|
||||||
broker *Broker
|
broker *Broker
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServerState creates a new server state tracker.
|
// NewServerState creates a new server state tracker.
|
||||||
func NewServerState(logger zerolog.Logger, store *storage.Storage, broker *Broker) *ServerState {
|
func NewServerState(logger zerolog.Logger, store storage.FileStore, broker *Broker) *ServerState {
|
||||||
return &ServerState{
|
return &ServerState{
|
||||||
logger: logger.With().Str("component", "server-state").Logger(),
|
logger: logger.With().Str("component", "server-state").Logger(),
|
||||||
storageOK: store != nil, // assume healthy if configured
|
storageOK: store != nil, // assume healthy if configured
|
||||||
|
|||||||
@@ -109,14 +109,21 @@ type DatabaseConfig struct {
|
|||||||
MaxConnections int `yaml:"max_connections"`
|
MaxConnections int `yaml:"max_connections"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// StorageConfig holds MinIO connection settings.
|
// StorageConfig holds object storage settings.
|
||||||
type StorageConfig struct {
|
type StorageConfig struct {
|
||||||
Endpoint string `yaml:"endpoint"`
|
Backend string `yaml:"backend"` // "minio" (default) or "filesystem"
|
||||||
AccessKey string `yaml:"access_key"`
|
Endpoint string `yaml:"endpoint"`
|
||||||
SecretKey string `yaml:"secret_key"`
|
AccessKey string `yaml:"access_key"`
|
||||||
Bucket string `yaml:"bucket"`
|
SecretKey string `yaml:"secret_key"`
|
||||||
UseSSL bool `yaml:"use_ssl"`
|
Bucket string `yaml:"bucket"`
|
||||||
Region string `yaml:"region"`
|
UseSSL bool `yaml:"use_ssl"`
|
||||||
|
Region string `yaml:"region"`
|
||||||
|
Filesystem FilesystemConfig `yaml:"filesystem"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// FilesystemConfig holds local filesystem storage settings.
|
||||||
|
type FilesystemConfig struct {
|
||||||
|
RootDir string `yaml:"root_dir"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// SchemasConfig holds schema loading settings.
|
// SchemasConfig holds schema loading settings.
|
||||||
|
|||||||
177
internal/storage/filesystem.go
Normal file
177
internal/storage/filesystem.go
Normal file
@@ -0,0 +1,177 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrPresignNotSupported is returned when presigned URLs are requested from a
|
||||||
|
// backend that does not support them.
|
||||||
|
var ErrPresignNotSupported = errors.New("presigned URLs not supported by filesystem backend")
|
||||||
|
|
||||||
|
// Compile-time check: *FilesystemStore implements FileStore.
|
||||||
|
var _ FileStore = (*FilesystemStore)(nil)
|
||||||
|
|
||||||
|
// FilesystemStore stores objects as files under a root directory.
|
||||||
|
type FilesystemStore struct {
|
||||||
|
root string // absolute path
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewFilesystemStore creates a new filesystem-backed store rooted at root.
|
||||||
|
// The directory is created if it does not exist.
|
||||||
|
func NewFilesystemStore(root string) (*FilesystemStore, error) {
|
||||||
|
abs, err := filepath.Abs(root)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("resolving root path: %w", err)
|
||||||
|
}
|
||||||
|
if err := os.MkdirAll(abs, 0o755); err != nil {
|
||||||
|
return nil, fmt.Errorf("creating root directory: %w", err)
|
||||||
|
}
|
||||||
|
return &FilesystemStore{root: abs}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// path returns the absolute filesystem path for a storage key.
|
||||||
|
func (fs *FilesystemStore) path(key string) string {
|
||||||
|
return filepath.Join(fs.root, filepath.FromSlash(key))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put writes reader to the file at key using atomic rename.
|
||||||
|
// SHA-256 checksum is computed during write and returned in PutResult.
|
||||||
|
func (fs *FilesystemStore) Put(_ context.Context, key string, reader io.Reader, _ int64, _ string) (*PutResult, error) {
|
||||||
|
dest := fs.path(key)
|
||||||
|
|
||||||
|
if err := os.MkdirAll(filepath.Dir(dest), 0o755); err != nil {
|
||||||
|
return nil, fmt.Errorf("creating directories: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write to a temp file in the same directory so os.Rename is atomic.
|
||||||
|
tmp, err := os.CreateTemp(filepath.Dir(dest), ".silo-tmp-*")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating temp file: %w", err)
|
||||||
|
}
|
||||||
|
tmpPath := tmp.Name()
|
||||||
|
defer func() {
|
||||||
|
// Clean up temp file on any failure path.
|
||||||
|
tmp.Close()
|
||||||
|
os.Remove(tmpPath)
|
||||||
|
}()
|
||||||
|
|
||||||
|
h := sha256.New()
|
||||||
|
w := io.MultiWriter(tmp, h)
|
||||||
|
|
||||||
|
n, err := io.Copy(w, reader)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("writing file: %w", err)
|
||||||
|
}
|
||||||
|
if err := tmp.Close(); err != nil {
|
||||||
|
return nil, fmt.Errorf("closing temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.Rename(tmpPath, dest); err != nil {
|
||||||
|
return nil, fmt.Errorf("renaming temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &PutResult{
|
||||||
|
Key: key,
|
||||||
|
Size: n,
|
||||||
|
Checksum: hex.EncodeToString(h.Sum(nil)),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get opens the file at key for reading.
|
||||||
|
func (fs *FilesystemStore) Get(_ context.Context, key string) (io.ReadCloser, error) {
|
||||||
|
f, err := os.Open(fs.path(key))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("opening file: %w", err)
|
||||||
|
}
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetVersion delegates to Get — filesystem storage has no versioning.
|
||||||
|
func (fs *FilesystemStore) GetVersion(ctx context.Context, key string, _ string) (io.ReadCloser, error) {
|
||||||
|
return fs.Get(ctx, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete removes the file at key. No error if already absent.
|
||||||
|
func (fs *FilesystemStore) Delete(_ context.Context, key string) error {
|
||||||
|
err := os.Remove(fs.path(key))
|
||||||
|
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||||
|
return fmt.Errorf("removing file: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exists reports whether the file at key exists.
|
||||||
|
func (fs *FilesystemStore) Exists(_ context.Context, key string) (bool, error) {
|
||||||
|
_, err := os.Stat(fs.path(key))
|
||||||
|
if err == nil {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
if errors.Is(err, os.ErrNotExist) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, fmt.Errorf("checking file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy duplicates a file from srcKey to dstKey using atomic rename.
|
||||||
|
func (fs *FilesystemStore) Copy(_ context.Context, srcKey, dstKey string) error {
|
||||||
|
srcPath := fs.path(srcKey)
|
||||||
|
dstPath := fs.path(dstKey)
|
||||||
|
|
||||||
|
src, err := os.Open(srcPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("opening source: %w", err)
|
||||||
|
}
|
||||||
|
defer src.Close()
|
||||||
|
|
||||||
|
if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil {
|
||||||
|
return fmt.Errorf("creating directories: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tmp, err := os.CreateTemp(filepath.Dir(dstPath), ".silo-tmp-*")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating temp file: %w", err)
|
||||||
|
}
|
||||||
|
tmpPath := tmp.Name()
|
||||||
|
defer func() {
|
||||||
|
tmp.Close()
|
||||||
|
os.Remove(tmpPath)
|
||||||
|
}()
|
||||||
|
|
||||||
|
if _, err := io.Copy(tmp, src); err != nil {
|
||||||
|
return fmt.Errorf("copying file: %w", err)
|
||||||
|
}
|
||||||
|
if err := tmp.Close(); err != nil {
|
||||||
|
return fmt.Errorf("closing temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.Rename(tmpPath, dstPath); err != nil {
|
||||||
|
return fmt.Errorf("renaming temp file: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// PresignPut is not supported by the filesystem backend.
|
||||||
|
func (fs *FilesystemStore) PresignPut(_ context.Context, _ string, _ time.Duration) (*url.URL, error) {
|
||||||
|
return nil, ErrPresignNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping verifies the root directory is accessible and writable.
|
||||||
|
func (fs *FilesystemStore) Ping(_ context.Context) error {
|
||||||
|
tmp, err := os.CreateTemp(fs.root, ".silo-ping-*")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("storage ping failed: %w", err)
|
||||||
|
}
|
||||||
|
name := tmp.Name()
|
||||||
|
tmp.Close()
|
||||||
|
os.Remove(name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
277
internal/storage/filesystem_test.go
Normal file
277
internal/storage/filesystem_test.go
Normal file
@@ -0,0 +1,277 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newTestStore(t *testing.T) *FilesystemStore {
|
||||||
|
t.Helper()
|
||||||
|
fs, err := NewFilesystemStore(t.TempDir())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewFilesystemStore: %v", err)
|
||||||
|
}
|
||||||
|
return fs
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewFilesystemStore(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
sub := filepath.Join(dir, "a", "b")
|
||||||
|
fs, err := NewFilesystemStore(sub)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if !filepath.IsAbs(fs.root) {
|
||||||
|
t.Errorf("root is not absolute: %s", fs.root)
|
||||||
|
}
|
||||||
|
info, err := os.Stat(sub)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("root dir missing: %v", err)
|
||||||
|
}
|
||||||
|
if !info.IsDir() {
|
||||||
|
t.Error("root is not a directory")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPut(t *testing.T) {
|
||||||
|
fs := newTestStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
data := []byte("hello world")
|
||||||
|
h := sha256.Sum256(data)
|
||||||
|
wantChecksum := hex.EncodeToString(h[:])
|
||||||
|
|
||||||
|
result, err := fs.Put(ctx, "items/P001/rev1.FCStd", bytes.NewReader(data), int64(len(data)), "application/octet-stream")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Put: %v", err)
|
||||||
|
}
|
||||||
|
if result.Key != "items/P001/rev1.FCStd" {
|
||||||
|
t.Errorf("Key = %q, want %q", result.Key, "items/P001/rev1.FCStd")
|
||||||
|
}
|
||||||
|
if result.Size != int64(len(data)) {
|
||||||
|
t.Errorf("Size = %d, want %d", result.Size, len(data))
|
||||||
|
}
|
||||||
|
if result.Checksum != wantChecksum {
|
||||||
|
t.Errorf("Checksum = %q, want %q", result.Checksum, wantChecksum)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify file on disk.
|
||||||
|
got, err := os.ReadFile(fs.path("items/P001/rev1.FCStd"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("reading file: %v", err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(got, data) {
|
||||||
|
t.Error("file content mismatch")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPutAtomicity(t *testing.T) {
|
||||||
|
fs := newTestStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
key := "test/atomic.bin"
|
||||||
|
|
||||||
|
// Write an initial file.
|
||||||
|
if _, err := fs.Put(ctx, key, strings.NewReader("original"), 8, ""); err != nil {
|
||||||
|
t.Fatalf("initial Put: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write with a reader that fails partway through.
|
||||||
|
failing := io.MultiReader(strings.NewReader("partial"), &errReader{})
|
||||||
|
_, err := fs.Put(ctx, key, failing, 100, "")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error from failing reader")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Original file should still be intact.
|
||||||
|
got, err := os.ReadFile(fs.path(key))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("reading file after failed put: %v", err)
|
||||||
|
}
|
||||||
|
if string(got) != "original" {
|
||||||
|
t.Errorf("file content = %q, want %q", got, "original")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type errReader struct{}
|
||||||
|
|
||||||
|
func (e *errReader) Read([]byte) (int, error) {
|
||||||
|
return 0, io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGet(t *testing.T) {
|
||||||
|
fs := newTestStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
data := []byte("test content")
|
||||||
|
|
||||||
|
if _, err := fs.Put(ctx, "f.txt", bytes.NewReader(data), int64(len(data)), ""); err != nil {
|
||||||
|
t.Fatalf("Put: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rc, err := fs.Get(ctx, "f.txt")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Get: %v", err)
|
||||||
|
}
|
||||||
|
defer rc.Close()
|
||||||
|
|
||||||
|
got, err := io.ReadAll(rc)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ReadAll: %v", err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(got, data) {
|
||||||
|
t.Error("content mismatch")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetMissing(t *testing.T) {
|
||||||
|
fs := newTestStore(t)
|
||||||
|
_, err := fs.Get(context.Background(), "no/such/file")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error for missing file")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetVersion(t *testing.T) {
|
||||||
|
fs := newTestStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
data := []byte("versioned")
|
||||||
|
|
||||||
|
if _, err := fs.Put(ctx, "v.txt", bytes.NewReader(data), int64(len(data)), ""); err != nil {
|
||||||
|
t.Fatalf("Put: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetVersion ignores versionID, returns same file.
|
||||||
|
rc, err := fs.GetVersion(ctx, "v.txt", "ignored-version-id")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetVersion: %v", err)
|
||||||
|
}
|
||||||
|
defer rc.Close()
|
||||||
|
|
||||||
|
got, err := io.ReadAll(rc)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ReadAll: %v", err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(got, data) {
|
||||||
|
t.Error("content mismatch")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDelete(t *testing.T) {
|
||||||
|
fs := newTestStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
if _, err := fs.Put(ctx, "del.txt", strings.NewReader("x"), 1, ""); err != nil {
|
||||||
|
t.Fatalf("Put: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fs.Delete(ctx, "del.txt"); err != nil {
|
||||||
|
t.Fatalf("Delete: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := os.Stat(fs.path("del.txt")); !os.IsNotExist(err) {
|
||||||
|
t.Error("file still exists after delete")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteMissing(t *testing.T) {
|
||||||
|
fs := newTestStore(t)
|
||||||
|
if err := fs.Delete(context.Background(), "no/such/file"); err != nil {
|
||||||
|
t.Fatalf("Delete missing file should not error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExists(t *testing.T) {
|
||||||
|
fs := newTestStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
ok, err := fs.Exists(ctx, "nope")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Exists: %v", err)
|
||||||
|
}
|
||||||
|
if ok {
|
||||||
|
t.Error("Exists returned true for missing file")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := fs.Put(ctx, "yes.txt", strings.NewReader("y"), 1, ""); err != nil {
|
||||||
|
t.Fatalf("Put: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ok, err = fs.Exists(ctx, "yes.txt")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Exists: %v", err)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
t.Error("Exists returned false for existing file")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCopy(t *testing.T) {
|
||||||
|
fs := newTestStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
data := []byte("copy me")
|
||||||
|
|
||||||
|
if _, err := fs.Put(ctx, "src.bin", bytes.NewReader(data), int64(len(data)), ""); err != nil {
|
||||||
|
t.Fatalf("Put: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fs.Copy(ctx, "src.bin", "deep/nested/dst.bin"); err != nil {
|
||||||
|
t.Fatalf("Copy: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := os.ReadFile(fs.path("deep/nested/dst.bin"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("reading copied file: %v", err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(got, data) {
|
||||||
|
t.Error("copied content mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Source should still exist.
|
||||||
|
if _, err := os.Stat(fs.path("src.bin")); err != nil {
|
||||||
|
t.Error("source file missing after copy")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPresignPut(t *testing.T) {
|
||||||
|
fs := newTestStore(t)
|
||||||
|
_, err := fs.PresignPut(context.Background(), "key", 5*60)
|
||||||
|
if err != ErrPresignNotSupported {
|
||||||
|
t.Errorf("PresignPut error = %v, want ErrPresignNotSupported", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPing(t *testing.T) {
|
||||||
|
fs := newTestStore(t)
|
||||||
|
if err := fs.Ping(context.Background()); err != nil {
|
||||||
|
t.Fatalf("Ping: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPingBadRoot(t *testing.T) {
|
||||||
|
fs := &FilesystemStore{root: "/nonexistent/path/that/should/not/exist"}
|
||||||
|
if err := fs.Ping(context.Background()); err == nil {
|
||||||
|
t.Fatal("expected Ping to fail with invalid root")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPutOverwrite(t *testing.T) {
|
||||||
|
fs := newTestStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
if _, err := fs.Put(ctx, "ow.txt", strings.NewReader("first"), 5, ""); err != nil {
|
||||||
|
t.Fatalf("Put: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := fs.Put(ctx, "ow.txt", strings.NewReader("second"), 6, ""); err != nil {
|
||||||
|
t.Fatalf("Put overwrite: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, _ := os.ReadFile(fs.path("ow.txt"))
|
||||||
|
if string(got) != "second" {
|
||||||
|
t.Errorf("content = %q, want %q", got, "second")
|
||||||
|
}
|
||||||
|
}
|
||||||
21
internal/storage/interface.go
Normal file
21
internal/storage/interface.go
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
// Package storage defines the FileStore interface and backend implementations.
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FileStore is the interface for file storage backends.
|
||||||
|
type FileStore interface {
|
||||||
|
Put(ctx context.Context, key string, reader io.Reader, size int64, contentType string) (*PutResult, error)
|
||||||
|
Get(ctx context.Context, key string) (io.ReadCloser, error)
|
||||||
|
GetVersion(ctx context.Context, key string, versionID string) (io.ReadCloser, error)
|
||||||
|
Delete(ctx context.Context, key string) error
|
||||||
|
Exists(ctx context.Context, key string) (bool, error)
|
||||||
|
Copy(ctx context.Context, srcKey, dstKey string) error
|
||||||
|
PresignPut(ctx context.Context, key string, expiry time.Duration) (*url.URL, error)
|
||||||
|
Ping(ctx context.Context) error
|
||||||
|
}
|
||||||
@@ -1,4 +1,3 @@
|
|||||||
// Package storage provides MinIO file storage operations.
|
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -22,6 +21,9 @@ type Config struct {
|
|||||||
Region string
|
Region string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Compile-time check: *Storage implements FileStore.
|
||||||
|
var _ FileStore = (*Storage)(nil)
|
||||||
|
|
||||||
// Storage wraps MinIO client operations.
|
// Storage wraps MinIO client operations.
|
||||||
type Storage struct {
|
type Storage struct {
|
||||||
client *minio.Client
|
client *minio.Client
|
||||||
@@ -112,6 +114,19 @@ func (s *Storage) Delete(ctx context.Context, key string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Exists checks if an object exists in storage.
|
||||||
|
func (s *Storage) Exists(ctx context.Context, key string) (bool, error) {
|
||||||
|
_, err := s.client.StatObject(ctx, s.bucket, key, minio.StatObjectOptions{})
|
||||||
|
if err != nil {
|
||||||
|
resp := minio.ToErrorResponse(err)
|
||||||
|
if resp.Code == "NoSuchKey" {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, fmt.Errorf("checking object existence: %w", err)
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Ping checks if the storage backend is reachable by verifying the bucket exists.
|
// Ping checks if the storage backend is reachable by verifying the bucket exists.
|
||||||
func (s *Storage) Ping(ctx context.Context) error {
|
func (s *Storage) Ping(ctx context.Context) error {
|
||||||
_, err := s.client.BucketExists(ctx, s.bucket)
|
_, err := s.client.BucketExists(ctx, s.bucket)
|
||||||
|
|||||||
Reference in New Issue
Block a user