feat(storage): FileStore interface abstraction + filesystem backend #134

Merged
forbes merged 2 commits from feat-storage-interface-filesystem into main 2026-02-17 17:55:10 +00:00
9 changed files with 545 additions and 28 deletions

View File

@@ -65,24 +65,39 @@ func main() {
logger.Info().Msg("connected to database")
// Connect to storage (optional - may be externally managed)
var store *storage.Storage
if cfg.Storage.Endpoint != "" {
store, err = storage.Connect(ctx, storage.Config{
Endpoint: cfg.Storage.Endpoint,
AccessKey: cfg.Storage.AccessKey,
SecretKey: cfg.Storage.SecretKey,
Bucket: cfg.Storage.Bucket,
UseSSL: cfg.Storage.UseSSL,
Region: cfg.Storage.Region,
})
if err != nil {
logger.Warn().Err(err).Msg("failed to connect to storage - file operations disabled")
store = nil
var store storage.FileStore
switch cfg.Storage.Backend {
case "minio", "":
if cfg.Storage.Endpoint != "" {
s, connErr := storage.Connect(ctx, storage.Config{
Endpoint: cfg.Storage.Endpoint,
AccessKey: cfg.Storage.AccessKey,
SecretKey: cfg.Storage.SecretKey,
Bucket: cfg.Storage.Bucket,
UseSSL: cfg.Storage.UseSSL,
Region: cfg.Storage.Region,
})
if connErr != nil {
logger.Warn().Err(connErr).Msg("failed to connect to storage - file operations disabled")
} else {
store = s
logger.Info().Msg("connected to storage")
}
} else {
logger.Info().Msg("connected to storage")
logger.Info().Msg("storage not configured - file operations disabled")
}
} else {
logger.Info().Msg("storage not configured - file operations disabled")
case "filesystem":
if cfg.Storage.Filesystem.RootDir == "" {
logger.Fatal().Msg("storage.filesystem.root_dir is required when backend is \"filesystem\"")
}
s, fsErr := storage.NewFilesystemStore(cfg.Storage.Filesystem.RootDir)
if fsErr != nil {
logger.Fatal().Err(fsErr).Msg("failed to initialize filesystem storage")
}
store = s
logger.Info().Str("root", cfg.Storage.Filesystem.RootDir).Msg("connected to filesystem storage")
default:
logger.Fatal().Str("backend", cfg.Storage.Backend).Msg("unknown storage backend")
}
// Load schemas

View File

@@ -17,12 +17,17 @@ database:
max_connections: 10
storage:
backend: "minio" # "minio" (default) or "filesystem"
# MinIO/S3 settings (used when backend: "minio")
endpoint: "localhost:9000" # Use "minio:9000" for Docker Compose
access_key: "" # Use SILO_MINIO_ACCESS_KEY env var
secret_key: "" # Use SILO_MINIO_SECRET_KEY env var
bucket: "silo-files"
use_ssl: true # Use false for Docker Compose (internal network)
region: "us-east-1"
# Filesystem settings (used when backend: "filesystem")
# filesystem:
# root_dir: "/var/lib/silo/objects"
schemas:
# Directory containing YAML schema files

View File

@@ -37,7 +37,7 @@ type Server struct {
schemas map[string]*schema.Schema
schemasDir string
partgen *partnum.Generator
storage *storage.Storage
storage storage.FileStore
auth *auth.Service
sessions *scs.SessionManager
oidc *auth.OIDCBackend
@@ -61,7 +61,7 @@ func NewServer(
database *db.DB,
schemas map[string]*schema.Schema,
schemasDir string,
store *storage.Storage,
store storage.FileStore,
authService *auth.Service,
sessionManager *scs.SessionManager,
oidcBackend *auth.OIDCBackend,

View File

@@ -26,13 +26,13 @@ type ServerState struct {
mu sync.RWMutex
readOnly bool
storageOK bool
storage *storage.Storage
storage storage.FileStore
broker *Broker
done chan struct{}
}
// NewServerState creates a new server state tracker.
func NewServerState(logger zerolog.Logger, store *storage.Storage, broker *Broker) *ServerState {
func NewServerState(logger zerolog.Logger, store storage.FileStore, broker *Broker) *ServerState {
return &ServerState{
logger: logger.With().Str("component", "server-state").Logger(),
storageOK: store != nil, // assume healthy if configured

View File

@@ -109,14 +109,21 @@ type DatabaseConfig struct {
MaxConnections int `yaml:"max_connections"`
}
// StorageConfig holds MinIO connection settings.
// StorageConfig holds object storage settings.
type StorageConfig struct {
Endpoint string `yaml:"endpoint"`
AccessKey string `yaml:"access_key"`
SecretKey string `yaml:"secret_key"`
Bucket string `yaml:"bucket"`
UseSSL bool `yaml:"use_ssl"`
Region string `yaml:"region"`
Backend string `yaml:"backend"` // "minio" (default) or "filesystem"
Endpoint string `yaml:"endpoint"`
AccessKey string `yaml:"access_key"`
SecretKey string `yaml:"secret_key"`
Bucket string `yaml:"bucket"`
UseSSL bool `yaml:"use_ssl"`
Region string `yaml:"region"`
Filesystem FilesystemConfig `yaml:"filesystem"`
}
// FilesystemConfig holds local filesystem storage settings.
type FilesystemConfig struct {
RootDir string `yaml:"root_dir"`
}
// SchemasConfig holds schema loading settings.

View File

@@ -0,0 +1,177 @@
package storage
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"time"
)
// ErrPresignNotSupported is returned when presigned URLs are requested from a
// backend that does not support them.
var ErrPresignNotSupported = errors.New("presigned URLs not supported by filesystem backend")
// Compile-time check: *FilesystemStore implements FileStore.
var _ FileStore = (*FilesystemStore)(nil)
// FilesystemStore stores objects as files under a root directory.
type FilesystemStore struct {
root string // absolute path
}
// NewFilesystemStore creates a new filesystem-backed store rooted at root.
// The directory is created if it does not exist.
func NewFilesystemStore(root string) (*FilesystemStore, error) {
abs, err := filepath.Abs(root)
if err != nil {
return nil, fmt.Errorf("resolving root path: %w", err)
}
if err := os.MkdirAll(abs, 0o755); err != nil {
return nil, fmt.Errorf("creating root directory: %w", err)
}
return &FilesystemStore{root: abs}, nil
}
// path returns the absolute filesystem path for a storage key.
func (fs *FilesystemStore) path(key string) string {
return filepath.Join(fs.root, filepath.FromSlash(key))
}
// Put writes reader to the file at key using atomic rename.
// SHA-256 checksum is computed during write and returned in PutResult.
func (fs *FilesystemStore) Put(_ context.Context, key string, reader io.Reader, _ int64, _ string) (*PutResult, error) {
dest := fs.path(key)
if err := os.MkdirAll(filepath.Dir(dest), 0o755); err != nil {
return nil, fmt.Errorf("creating directories: %w", err)
}
// Write to a temp file in the same directory so os.Rename is atomic.
tmp, err := os.CreateTemp(filepath.Dir(dest), ".silo-tmp-*")
if err != nil {
return nil, fmt.Errorf("creating temp file: %w", err)
}
tmpPath := tmp.Name()
defer func() {
// Clean up temp file on any failure path.
tmp.Close()
os.Remove(tmpPath)
}()
h := sha256.New()
w := io.MultiWriter(tmp, h)
n, err := io.Copy(w, reader)
if err != nil {
return nil, fmt.Errorf("writing file: %w", err)
}
if err := tmp.Close(); err != nil {
return nil, fmt.Errorf("closing temp file: %w", err)
}
if err := os.Rename(tmpPath, dest); err != nil {
return nil, fmt.Errorf("renaming temp file: %w", err)
}
return &PutResult{
Key: key,
Size: n,
Checksum: hex.EncodeToString(h.Sum(nil)),
}, nil
}
// Get opens the file at key for reading.
func (fs *FilesystemStore) Get(_ context.Context, key string) (io.ReadCloser, error) {
f, err := os.Open(fs.path(key))
if err != nil {
return nil, fmt.Errorf("opening file: %w", err)
}
return f, nil
}
// GetVersion delegates to Get — filesystem storage has no versioning.
func (fs *FilesystemStore) GetVersion(ctx context.Context, key string, _ string) (io.ReadCloser, error) {
return fs.Get(ctx, key)
}
// Delete removes the file at key. No error if already absent.
func (fs *FilesystemStore) Delete(_ context.Context, key string) error {
err := os.Remove(fs.path(key))
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("removing file: %w", err)
}
return nil
}
// Exists reports whether the file at key exists.
func (fs *FilesystemStore) Exists(_ context.Context, key string) (bool, error) {
_, err := os.Stat(fs.path(key))
if err == nil {
return true, nil
}
if errors.Is(err, os.ErrNotExist) {
return false, nil
}
return false, fmt.Errorf("checking file: %w", err)
}
// Copy duplicates a file from srcKey to dstKey using atomic rename.
func (fs *FilesystemStore) Copy(_ context.Context, srcKey, dstKey string) error {
srcPath := fs.path(srcKey)
dstPath := fs.path(dstKey)
src, err := os.Open(srcPath)
if err != nil {
return fmt.Errorf("opening source: %w", err)
}
defer src.Close()
if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil {
return fmt.Errorf("creating directories: %w", err)
}
tmp, err := os.CreateTemp(filepath.Dir(dstPath), ".silo-tmp-*")
if err != nil {
return fmt.Errorf("creating temp file: %w", err)
}
tmpPath := tmp.Name()
defer func() {
tmp.Close()
os.Remove(tmpPath)
}()
if _, err := io.Copy(tmp, src); err != nil {
return fmt.Errorf("copying file: %w", err)
}
if err := tmp.Close(); err != nil {
return fmt.Errorf("closing temp file: %w", err)
}
if err := os.Rename(tmpPath, dstPath); err != nil {
return fmt.Errorf("renaming temp file: %w", err)
}
return nil
}
// PresignPut is not supported by the filesystem backend.
func (fs *FilesystemStore) PresignPut(_ context.Context, _ string, _ time.Duration) (*url.URL, error) {
return nil, ErrPresignNotSupported
}
// Ping verifies the root directory is accessible and writable.
func (fs *FilesystemStore) Ping(_ context.Context) error {
tmp, err := os.CreateTemp(fs.root, ".silo-ping-*")
if err != nil {
return fmt.Errorf("storage ping failed: %w", err)
}
name := tmp.Name()
tmp.Close()
os.Remove(name)
return nil
}

View File

@@ -0,0 +1,277 @@
package storage
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"io"
"os"
"path/filepath"
"strings"
"testing"
)
func newTestStore(t *testing.T) *FilesystemStore {
t.Helper()
fs, err := NewFilesystemStore(t.TempDir())
if err != nil {
t.Fatalf("NewFilesystemStore: %v", err)
}
return fs
}
func TestNewFilesystemStore(t *testing.T) {
dir := t.TempDir()
sub := filepath.Join(dir, "a", "b")
fs, err := NewFilesystemStore(sub)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !filepath.IsAbs(fs.root) {
t.Errorf("root is not absolute: %s", fs.root)
}
info, err := os.Stat(sub)
if err != nil {
t.Fatalf("root dir missing: %v", err)
}
if !info.IsDir() {
t.Error("root is not a directory")
}
}
func TestPut(t *testing.T) {
fs := newTestStore(t)
ctx := context.Background()
data := []byte("hello world")
h := sha256.Sum256(data)
wantChecksum := hex.EncodeToString(h[:])
result, err := fs.Put(ctx, "items/P001/rev1.FCStd", bytes.NewReader(data), int64(len(data)), "application/octet-stream")
if err != nil {
t.Fatalf("Put: %v", err)
}
if result.Key != "items/P001/rev1.FCStd" {
t.Errorf("Key = %q, want %q", result.Key, "items/P001/rev1.FCStd")
}
if result.Size != int64(len(data)) {
t.Errorf("Size = %d, want %d", result.Size, len(data))
}
if result.Checksum != wantChecksum {
t.Errorf("Checksum = %q, want %q", result.Checksum, wantChecksum)
}
// Verify file on disk.
got, err := os.ReadFile(fs.path("items/P001/rev1.FCStd"))
if err != nil {
t.Fatalf("reading file: %v", err)
}
if !bytes.Equal(got, data) {
t.Error("file content mismatch")
}
}
func TestPutAtomicity(t *testing.T) {
fs := newTestStore(t)
ctx := context.Background()
key := "test/atomic.bin"
// Write an initial file.
if _, err := fs.Put(ctx, key, strings.NewReader("original"), 8, ""); err != nil {
t.Fatalf("initial Put: %v", err)
}
// Write with a reader that fails partway through.
failing := io.MultiReader(strings.NewReader("partial"), &errReader{})
_, err := fs.Put(ctx, key, failing, 100, "")
if err == nil {
t.Fatal("expected error from failing reader")
}
// Original file should still be intact.
got, err := os.ReadFile(fs.path(key))
if err != nil {
t.Fatalf("reading file after failed put: %v", err)
}
if string(got) != "original" {
t.Errorf("file content = %q, want %q", got, "original")
}
}
type errReader struct{}
func (e *errReader) Read([]byte) (int, error) {
return 0, io.ErrUnexpectedEOF
}
func TestGet(t *testing.T) {
fs := newTestStore(t)
ctx := context.Background()
data := []byte("test content")
if _, err := fs.Put(ctx, "f.txt", bytes.NewReader(data), int64(len(data)), ""); err != nil {
t.Fatalf("Put: %v", err)
}
rc, err := fs.Get(ctx, "f.txt")
if err != nil {
t.Fatalf("Get: %v", err)
}
defer rc.Close()
got, err := io.ReadAll(rc)
if err != nil {
t.Fatalf("ReadAll: %v", err)
}
if !bytes.Equal(got, data) {
t.Error("content mismatch")
}
}
func TestGetMissing(t *testing.T) {
fs := newTestStore(t)
_, err := fs.Get(context.Background(), "no/such/file")
if err == nil {
t.Fatal("expected error for missing file")
}
}
func TestGetVersion(t *testing.T) {
fs := newTestStore(t)
ctx := context.Background()
data := []byte("versioned")
if _, err := fs.Put(ctx, "v.txt", bytes.NewReader(data), int64(len(data)), ""); err != nil {
t.Fatalf("Put: %v", err)
}
// GetVersion ignores versionID, returns same file.
rc, err := fs.GetVersion(ctx, "v.txt", "ignored-version-id")
if err != nil {
t.Fatalf("GetVersion: %v", err)
}
defer rc.Close()
got, err := io.ReadAll(rc)
if err != nil {
t.Fatalf("ReadAll: %v", err)
}
if !bytes.Equal(got, data) {
t.Error("content mismatch")
}
}
func TestDelete(t *testing.T) {
fs := newTestStore(t)
ctx := context.Background()
if _, err := fs.Put(ctx, "del.txt", strings.NewReader("x"), 1, ""); err != nil {
t.Fatalf("Put: %v", err)
}
if err := fs.Delete(ctx, "del.txt"); err != nil {
t.Fatalf("Delete: %v", err)
}
if _, err := os.Stat(fs.path("del.txt")); !os.IsNotExist(err) {
t.Error("file still exists after delete")
}
}
func TestDeleteMissing(t *testing.T) {
fs := newTestStore(t)
if err := fs.Delete(context.Background(), "no/such/file"); err != nil {
t.Fatalf("Delete missing file should not error: %v", err)
}
}
func TestExists(t *testing.T) {
fs := newTestStore(t)
ctx := context.Background()
ok, err := fs.Exists(ctx, "nope")
if err != nil {
t.Fatalf("Exists: %v", err)
}
if ok {
t.Error("Exists returned true for missing file")
}
if _, err := fs.Put(ctx, "yes.txt", strings.NewReader("y"), 1, ""); err != nil {
t.Fatalf("Put: %v", err)
}
ok, err = fs.Exists(ctx, "yes.txt")
if err != nil {
t.Fatalf("Exists: %v", err)
}
if !ok {
t.Error("Exists returned false for existing file")
}
}
func TestCopy(t *testing.T) {
fs := newTestStore(t)
ctx := context.Background()
data := []byte("copy me")
if _, err := fs.Put(ctx, "src.bin", bytes.NewReader(data), int64(len(data)), ""); err != nil {
t.Fatalf("Put: %v", err)
}
if err := fs.Copy(ctx, "src.bin", "deep/nested/dst.bin"); err != nil {
t.Fatalf("Copy: %v", err)
}
got, err := os.ReadFile(fs.path("deep/nested/dst.bin"))
if err != nil {
t.Fatalf("reading copied file: %v", err)
}
if !bytes.Equal(got, data) {
t.Error("copied content mismatch")
}
// Source should still exist.
if _, err := os.Stat(fs.path("src.bin")); err != nil {
t.Error("source file missing after copy")
}
}
func TestPresignPut(t *testing.T) {
fs := newTestStore(t)
_, err := fs.PresignPut(context.Background(), "key", 5*60)
if err != ErrPresignNotSupported {
t.Errorf("PresignPut error = %v, want ErrPresignNotSupported", err)
}
}
func TestPing(t *testing.T) {
fs := newTestStore(t)
if err := fs.Ping(context.Background()); err != nil {
t.Fatalf("Ping: %v", err)
}
}
func TestPingBadRoot(t *testing.T) {
fs := &FilesystemStore{root: "/nonexistent/path/that/should/not/exist"}
if err := fs.Ping(context.Background()); err == nil {
t.Fatal("expected Ping to fail with invalid root")
}
}
func TestPutOverwrite(t *testing.T) {
fs := newTestStore(t)
ctx := context.Background()
if _, err := fs.Put(ctx, "ow.txt", strings.NewReader("first"), 5, ""); err != nil {
t.Fatalf("Put: %v", err)
}
if _, err := fs.Put(ctx, "ow.txt", strings.NewReader("second"), 6, ""); err != nil {
t.Fatalf("Put overwrite: %v", err)
}
got, _ := os.ReadFile(fs.path("ow.txt"))
if string(got) != "second" {
t.Errorf("content = %q, want %q", got, "second")
}
}

View File

@@ -0,0 +1,21 @@
// Package storage defines the FileStore interface and backend implementations.
package storage
import (
"context"
"io"
"net/url"
"time"
)
// FileStore is the interface for file storage backends.
type FileStore interface {
Put(ctx context.Context, key string, reader io.Reader, size int64, contentType string) (*PutResult, error)
Get(ctx context.Context, key string) (io.ReadCloser, error)
GetVersion(ctx context.Context, key string, versionID string) (io.ReadCloser, error)
Delete(ctx context.Context, key string) error
Exists(ctx context.Context, key string) (bool, error)
Copy(ctx context.Context, srcKey, dstKey string) error
PresignPut(ctx context.Context, key string, expiry time.Duration) (*url.URL, error)
Ping(ctx context.Context) error
}

View File

@@ -1,4 +1,3 @@
// Package storage provides MinIO file storage operations.
package storage
import (
@@ -22,6 +21,9 @@ type Config struct {
Region string
}
// Compile-time check: *Storage implements FileStore.
var _ FileStore = (*Storage)(nil)
// Storage wraps MinIO client operations.
type Storage struct {
client *minio.Client
@@ -112,6 +114,19 @@ func (s *Storage) Delete(ctx context.Context, key string) error {
return nil
}
// Exists checks if an object exists in storage.
func (s *Storage) Exists(ctx context.Context, key string) (bool, error) {
_, err := s.client.StatObject(ctx, s.bucket, key, minio.StatObjectOptions{})
if err != nil {
resp := minio.ToErrorResponse(err)
if resp.Code == "NoSuchKey" {
return false, nil
}
return false, fmt.Errorf("checking object existence: %w", err)
}
return true, nil
}
// Ping checks if the storage backend is reachable by verifying the bucket exists.
func (s *Storage) Ping(ctx context.Context) error {
_, err := s.client.BucketExists(ctx, s.bucket)