Files
silo/internal/db/edit_sessions.go
Forbes 68c9acea5c feat(sessions): edit session acquire, release, and query endpoints
- Add 023_edit_sessions.sql migration with unique index on (item_id, context_level, object_id) for hard interference
- Add EditSessionRepository with Acquire, Release, ReleaseForWorkstation, GetByID, ListForItem, ListForUser, TouchHeartbeat, ExpireStale, GetConflict
- Add 4 handlers: acquire (POST), release (DELETE), list by item (GET), list by user (GET)
- Acquire auto-computes dependency_cone from DAG forward cone when available
- Hard interference returns 409 with holder info (username, workstation, context_level, object_id, acquired_at)
- Publish edit.session_acquired and edit.session_released via item-scoped SSE
- Add /api/edit-sessions (user scope) and /api/items/{pn}/edit-sessions (item scope) routes

Closes #163
2026-03-01 13:40:18 -06:00

223 lines
6.8 KiB
Go

package db
import (
"context"
"time"
"github.com/jackc/pgx/v5"
)
// EditSession represents an active editing context.
type EditSession struct {
ID string
ItemID string
UserID string
WorkstationID string
ContextLevel string
ObjectID *string
DependencyCone []string
AcquiredAt time.Time
LastHeartbeat time.Time
}
// EditSessionRepository provides edit session database operations.
type EditSessionRepository struct {
db *DB
}
// NewEditSessionRepository creates a new edit session repository.
func NewEditSessionRepository(db *DB) *EditSessionRepository {
return &EditSessionRepository{db: db}
}
// Acquire inserts a new edit session. Returns a unique constraint error
// if another session already holds the same (item_id, context_level, object_id).
func (r *EditSessionRepository) Acquire(ctx context.Context, s *EditSession) error {
return r.db.pool.QueryRow(ctx, `
INSERT INTO edit_sessions (item_id, user_id, workstation_id, context_level, object_id, dependency_cone)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id, acquired_at, last_heartbeat
`, s.ItemID, s.UserID, s.WorkstationID, s.ContextLevel, s.ObjectID, s.DependencyCone).
Scan(&s.ID, &s.AcquiredAt, &s.LastHeartbeat)
}
// Release deletes an edit session by ID.
func (r *EditSessionRepository) Release(ctx context.Context, id string) error {
_, err := r.db.pool.Exec(ctx, `DELETE FROM edit_sessions WHERE id = $1`, id)
return err
}
// ReleaseForWorkstation deletes all sessions for a workstation, returning
// the released sessions so callers can publish SSE notifications.
func (r *EditSessionRepository) ReleaseForWorkstation(ctx context.Context, workstationID string) ([]EditSession, error) {
rows, err := r.db.pool.Query(ctx, `
DELETE FROM edit_sessions
WHERE workstation_id = $1
RETURNING id, item_id, user_id, workstation_id, context_level, object_id, dependency_cone, acquired_at, last_heartbeat
`, workstationID)
if err != nil {
return nil, err
}
defer rows.Close()
var sessions []EditSession
for rows.Next() {
var s EditSession
if err := rows.Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID,
&s.ContextLevel, &s.ObjectID, &s.DependencyCone,
&s.AcquiredAt, &s.LastHeartbeat); err != nil {
return nil, err
}
sessions = append(sessions, s)
}
return sessions, rows.Err()
}
// GetByID returns an edit session by its ID.
func (r *EditSessionRepository) GetByID(ctx context.Context, id string) (*EditSession, error) {
s := &EditSession{}
err := r.db.pool.QueryRow(ctx, `
SELECT id, item_id, user_id, workstation_id, context_level, object_id,
dependency_cone, acquired_at, last_heartbeat
FROM edit_sessions
WHERE id = $1
`, id).Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID,
&s.ContextLevel, &s.ObjectID, &s.DependencyCone,
&s.AcquiredAt, &s.LastHeartbeat)
if err == pgx.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return s, nil
}
// ListForItem returns all active edit sessions for an item.
func (r *EditSessionRepository) ListForItem(ctx context.Context, itemID string) ([]*EditSession, error) {
rows, err := r.db.pool.Query(ctx, `
SELECT id, item_id, user_id, workstation_id, context_level, object_id,
dependency_cone, acquired_at, last_heartbeat
FROM edit_sessions
WHERE item_id = $1
ORDER BY acquired_at
`, itemID)
if err != nil {
return nil, err
}
defer rows.Close()
var sessions []*EditSession
for rows.Next() {
s := &EditSession{}
if err := rows.Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID,
&s.ContextLevel, &s.ObjectID, &s.DependencyCone,
&s.AcquiredAt, &s.LastHeartbeat); err != nil {
return nil, err
}
sessions = append(sessions, s)
}
return sessions, rows.Err()
}
// ListForUser returns all active edit sessions for a user.
func (r *EditSessionRepository) ListForUser(ctx context.Context, userID string) ([]*EditSession, error) {
rows, err := r.db.pool.Query(ctx, `
SELECT id, item_id, user_id, workstation_id, context_level, object_id,
dependency_cone, acquired_at, last_heartbeat
FROM edit_sessions
WHERE user_id = $1
ORDER BY acquired_at
`, userID)
if err != nil {
return nil, err
}
defer rows.Close()
var sessions []*EditSession
for rows.Next() {
s := &EditSession{}
if err := rows.Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID,
&s.ContextLevel, &s.ObjectID, &s.DependencyCone,
&s.AcquiredAt, &s.LastHeartbeat); err != nil {
return nil, err
}
sessions = append(sessions, s)
}
return sessions, rows.Err()
}
// TouchHeartbeat updates last_heartbeat for all sessions of a workstation.
func (r *EditSessionRepository) TouchHeartbeat(ctx context.Context, workstationID string) error {
_, err := r.db.pool.Exec(ctx, `
UPDATE edit_sessions SET last_heartbeat = now() WHERE workstation_id = $1
`, workstationID)
return err
}
// ExpireStale deletes sessions whose last_heartbeat is older than the given
// timeout, returning the expired sessions for SSE notification.
func (r *EditSessionRepository) ExpireStale(ctx context.Context, timeout time.Duration) ([]EditSession, error) {
rows, err := r.db.pool.Query(ctx, `
DELETE FROM edit_sessions
WHERE last_heartbeat < now() - $1::interval
RETURNING id, item_id, user_id, workstation_id, context_level, object_id, dependency_cone, acquired_at, last_heartbeat
`, timeout.String())
if err != nil {
return nil, err
}
defer rows.Close()
var sessions []EditSession
for rows.Next() {
var s EditSession
if err := rows.Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID,
&s.ContextLevel, &s.ObjectID, &s.DependencyCone,
&s.AcquiredAt, &s.LastHeartbeat); err != nil {
return nil, err
}
sessions = append(sessions, s)
}
return sessions, rows.Err()
}
// GetConflict returns the existing session holding a given (item, context_level, object_id)
// slot, for building 409 conflict responses.
func (r *EditSessionRepository) GetConflict(ctx context.Context, itemID, contextLevel string, objectID *string) (*EditSession, error) {
s := &EditSession{}
var query string
var args []any
if objectID != nil {
query = `
SELECT id, item_id, user_id, workstation_id, context_level, object_id,
dependency_cone, acquired_at, last_heartbeat
FROM edit_sessions
WHERE item_id = $1 AND context_level = $2 AND object_id = $3
`
args = []any{itemID, contextLevel, *objectID}
} else {
query = `
SELECT id, item_id, user_id, workstation_id, context_level, object_id,
dependency_cone, acquired_at, last_heartbeat
FROM edit_sessions
WHERE item_id = $1 AND context_level = $2 AND object_id IS NULL
`
args = []any{itemID, contextLevel}
}
err := r.db.pool.QueryRow(ctx, query, args...).Scan(
&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID,
&s.ContextLevel, &s.ObjectID, &s.DependencyCone,
&s.AcquiredAt, &s.LastHeartbeat)
if err == pgx.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return s, nil
}