Add server-sent events at GET /api/events for live mutation notifications. Add server mode (normal/read-only/degraded) exposed via /health, /ready, and SSE server.state events. New files: - broker.go: SSE event hub with client management, non-blocking fan-out, ring buffer history for Last-Event-ID replay, heartbeat - servermode.go: mode state machine with periodic MinIO health check and SIGUSR1 read-only toggle - sse_handler.go: HTTP handler using http.Flusher and ResponseController to disable WriteTimeout for long-lived SSE - broker_test.go, servermode_test.go: 13 unit tests Modified: - handlers.go: Server struct gains broker/serverState fields, Health/Ready include mode and sse_clients, write handlers emit item.created/updated/deleted and revision.created events - routes.go: register GET /api/events, add RequireWritable middleware to all 8 editor-gated route groups - middleware.go: RequireWritable returns 503 in read-only mode - csv.go, ods.go: emit bulk item.created events after import - storage.go: add Ping() method for health checks - config.go: add ReadOnly field to ServerConfig - main.go: create broker/state, start background goroutines, SIGUSR1 handler, graceful shutdown sequence Closes #38, closes #39
160 lines
4.3 KiB
Go
160 lines
4.3 KiB
Go
// Package storage provides MinIO file storage operations.
|
|
package storage
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"time"
|
|
|
|
"github.com/minio/minio-go/v7"
|
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
|
)
|
|
|
|
// Config holds MinIO connection settings.
|
|
type Config struct {
|
|
Endpoint string
|
|
AccessKey string
|
|
SecretKey string
|
|
Bucket string
|
|
UseSSL bool
|
|
Region string
|
|
}
|
|
|
|
// Storage wraps MinIO client operations.
|
|
type Storage struct {
|
|
client *minio.Client
|
|
bucket string
|
|
}
|
|
|
|
// Connect creates a new MinIO storage client.
|
|
func Connect(ctx context.Context, cfg Config) (*Storage, error) {
|
|
client, err := minio.New(cfg.Endpoint, &minio.Options{
|
|
Creds: credentials.NewStaticV4(cfg.AccessKey, cfg.SecretKey, ""),
|
|
Secure: cfg.UseSSL,
|
|
Region: cfg.Region,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating minio client: %w", err)
|
|
}
|
|
|
|
// Ensure bucket exists with versioning
|
|
exists, err := client.BucketExists(ctx, cfg.Bucket)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("checking bucket: %w", err)
|
|
}
|
|
if !exists {
|
|
if err := client.MakeBucket(ctx, cfg.Bucket, minio.MakeBucketOptions{
|
|
Region: cfg.Region,
|
|
}); err != nil {
|
|
return nil, fmt.Errorf("creating bucket: %w", err)
|
|
}
|
|
// Enable versioning
|
|
if err := client.EnableVersioning(ctx, cfg.Bucket); err != nil {
|
|
return nil, fmt.Errorf("enabling versioning: %w", err)
|
|
}
|
|
}
|
|
|
|
return &Storage{client: client, bucket: cfg.Bucket}, nil
|
|
}
|
|
|
|
// PutResult contains the result of a put operation.
|
|
type PutResult struct {
|
|
Key string
|
|
VersionID string
|
|
Size int64
|
|
Checksum string
|
|
}
|
|
|
|
// Put uploads a file to storage.
|
|
func (s *Storage) Put(ctx context.Context, key string, reader io.Reader, size int64, contentType string) (*PutResult, error) {
|
|
info, err := s.client.PutObject(ctx, s.bucket, key, reader, size, minio.PutObjectOptions{
|
|
ContentType: contentType,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("uploading object: %w", err)
|
|
}
|
|
|
|
return &PutResult{
|
|
Key: key,
|
|
VersionID: info.VersionID,
|
|
Size: info.Size,
|
|
Checksum: info.ChecksumSHA256,
|
|
}, nil
|
|
}
|
|
|
|
// Get downloads a file from storage.
|
|
func (s *Storage) Get(ctx context.Context, key string) (io.ReadCloser, error) {
|
|
obj, err := s.client.GetObject(ctx, s.bucket, key, minio.GetObjectOptions{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting object: %w", err)
|
|
}
|
|
return obj, nil
|
|
}
|
|
|
|
// GetVersion downloads a specific version of a file.
|
|
func (s *Storage) GetVersion(ctx context.Context, key, versionID string) (io.ReadCloser, error) {
|
|
obj, err := s.client.GetObject(ctx, s.bucket, key, minio.GetObjectOptions{
|
|
VersionID: versionID,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting object version: %w", err)
|
|
}
|
|
return obj, nil
|
|
}
|
|
|
|
// Delete removes a file from storage.
|
|
func (s *Storage) Delete(ctx context.Context, key string) error {
|
|
if err := s.client.RemoveObject(ctx, s.bucket, key, minio.RemoveObjectOptions{}); err != nil {
|
|
return fmt.Errorf("removing object: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Ping checks if the storage backend is reachable by verifying the bucket exists.
|
|
func (s *Storage) Ping(ctx context.Context) error {
|
|
_, err := s.client.BucketExists(ctx, s.bucket)
|
|
return err
|
|
}
|
|
|
|
// Bucket returns the bucket name.
|
|
func (s *Storage) Bucket() string {
|
|
return s.bucket
|
|
}
|
|
|
|
// PresignPut generates a presigned PUT URL for direct browser upload.
|
|
func (s *Storage) PresignPut(ctx context.Context, key string, expiry time.Duration) (*url.URL, error) {
|
|
u, err := s.client.PresignedPutObject(ctx, s.bucket, key, expiry)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("generating presigned put URL: %w", err)
|
|
}
|
|
return u, nil
|
|
}
|
|
|
|
// Copy copies an object within the same bucket from srcKey to dstKey.
|
|
func (s *Storage) Copy(ctx context.Context, srcKey, dstKey string) error {
|
|
src := minio.CopySrcOptions{
|
|
Bucket: s.bucket,
|
|
Object: srcKey,
|
|
}
|
|
dst := minio.CopyDestOptions{
|
|
Bucket: s.bucket,
|
|
Object: dstKey,
|
|
}
|
|
if _, err := s.client.CopyObject(ctx, dst, src); err != nil {
|
|
return fmt.Errorf("copying object: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// FileKey generates a storage key for an item file.
|
|
func FileKey(partNumber string, revision int) string {
|
|
return fmt.Sprintf("items/%s/rev%d.FCStd", partNumber, revision)
|
|
}
|
|
|
|
// ThumbnailKey generates a storage key for a thumbnail.
|
|
func ThumbnailKey(partNumber string, revision int) string {
|
|
return fmt.Sprintf("thumbnails/%s/rev%d.png", partNumber, revision)
|
|
}
|