- Add 023_edit_sessions.sql migration with unique index on (item_id, context_level, object_id) for hard interference
- Add EditSessionRepository with Acquire, Release, ReleaseForWorkstation, GetByID, ListForItem, ListForUser, TouchHeartbeat, ExpireStale, GetConflict
- Add 4 handlers: acquire (POST), release (DELETE), list by item (GET), list by user (GET)
- Acquire auto-computes dependency_cone from DAG forward cone when available
- Hard interference returns 409 with holder info (username, workstation, context_level, object_id, acquired_at)
- Publish edit.session_acquired and edit.session_released via item-scoped SSE
- Add /api/edit-sessions (user scope) and /api/items/{pn}/edit-sessions (item scope) routes
Closes #163
223 lines
6.8 KiB
Go
223 lines
6.8 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
// EditSession represents an active editing context.
|
|
type EditSession struct {
|
|
ID string
|
|
ItemID string
|
|
UserID string
|
|
WorkstationID string
|
|
ContextLevel string
|
|
ObjectID *string
|
|
DependencyCone []string
|
|
AcquiredAt time.Time
|
|
LastHeartbeat time.Time
|
|
}
|
|
|
|
// EditSessionRepository provides edit session database operations.
|
|
type EditSessionRepository struct {
|
|
db *DB
|
|
}
|
|
|
|
// NewEditSessionRepository creates a new edit session repository.
|
|
func NewEditSessionRepository(db *DB) *EditSessionRepository {
|
|
return &EditSessionRepository{db: db}
|
|
}
|
|
|
|
// Acquire inserts a new edit session. Returns a unique constraint error
|
|
// if another session already holds the same (item_id, context_level, object_id).
|
|
func (r *EditSessionRepository) Acquire(ctx context.Context, s *EditSession) error {
|
|
return r.db.pool.QueryRow(ctx, `
|
|
INSERT INTO edit_sessions (item_id, user_id, workstation_id, context_level, object_id, dependency_cone)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
RETURNING id, acquired_at, last_heartbeat
|
|
`, s.ItemID, s.UserID, s.WorkstationID, s.ContextLevel, s.ObjectID, s.DependencyCone).
|
|
Scan(&s.ID, &s.AcquiredAt, &s.LastHeartbeat)
|
|
}
|
|
|
|
// Release deletes an edit session by ID.
|
|
func (r *EditSessionRepository) Release(ctx context.Context, id string) error {
|
|
_, err := r.db.pool.Exec(ctx, `DELETE FROM edit_sessions WHERE id = $1`, id)
|
|
return err
|
|
}
|
|
|
|
// ReleaseForWorkstation deletes all sessions for a workstation, returning
|
|
// the released sessions so callers can publish SSE notifications.
|
|
func (r *EditSessionRepository) ReleaseForWorkstation(ctx context.Context, workstationID string) ([]EditSession, error) {
|
|
rows, err := r.db.pool.Query(ctx, `
|
|
DELETE FROM edit_sessions
|
|
WHERE workstation_id = $1
|
|
RETURNING id, item_id, user_id, workstation_id, context_level, object_id, dependency_cone, acquired_at, last_heartbeat
|
|
`, workstationID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var sessions []EditSession
|
|
for rows.Next() {
|
|
var s EditSession
|
|
if err := rows.Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID,
|
|
&s.ContextLevel, &s.ObjectID, &s.DependencyCone,
|
|
&s.AcquiredAt, &s.LastHeartbeat); err != nil {
|
|
return nil, err
|
|
}
|
|
sessions = append(sessions, s)
|
|
}
|
|
return sessions, rows.Err()
|
|
}
|
|
|
|
// GetByID returns an edit session by its ID.
|
|
func (r *EditSessionRepository) GetByID(ctx context.Context, id string) (*EditSession, error) {
|
|
s := &EditSession{}
|
|
err := r.db.pool.QueryRow(ctx, `
|
|
SELECT id, item_id, user_id, workstation_id, context_level, object_id,
|
|
dependency_cone, acquired_at, last_heartbeat
|
|
FROM edit_sessions
|
|
WHERE id = $1
|
|
`, id).Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID,
|
|
&s.ContextLevel, &s.ObjectID, &s.DependencyCone,
|
|
&s.AcquiredAt, &s.LastHeartbeat)
|
|
|
|
if err == pgx.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// ListForItem returns all active edit sessions for an item.
|
|
func (r *EditSessionRepository) ListForItem(ctx context.Context, itemID string) ([]*EditSession, error) {
|
|
rows, err := r.db.pool.Query(ctx, `
|
|
SELECT id, item_id, user_id, workstation_id, context_level, object_id,
|
|
dependency_cone, acquired_at, last_heartbeat
|
|
FROM edit_sessions
|
|
WHERE item_id = $1
|
|
ORDER BY acquired_at
|
|
`, itemID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var sessions []*EditSession
|
|
for rows.Next() {
|
|
s := &EditSession{}
|
|
if err := rows.Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID,
|
|
&s.ContextLevel, &s.ObjectID, &s.DependencyCone,
|
|
&s.AcquiredAt, &s.LastHeartbeat); err != nil {
|
|
return nil, err
|
|
}
|
|
sessions = append(sessions, s)
|
|
}
|
|
return sessions, rows.Err()
|
|
}
|
|
|
|
// ListForUser returns all active edit sessions for a user.
|
|
func (r *EditSessionRepository) ListForUser(ctx context.Context, userID string) ([]*EditSession, error) {
|
|
rows, err := r.db.pool.Query(ctx, `
|
|
SELECT id, item_id, user_id, workstation_id, context_level, object_id,
|
|
dependency_cone, acquired_at, last_heartbeat
|
|
FROM edit_sessions
|
|
WHERE user_id = $1
|
|
ORDER BY acquired_at
|
|
`, userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var sessions []*EditSession
|
|
for rows.Next() {
|
|
s := &EditSession{}
|
|
if err := rows.Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID,
|
|
&s.ContextLevel, &s.ObjectID, &s.DependencyCone,
|
|
&s.AcquiredAt, &s.LastHeartbeat); err != nil {
|
|
return nil, err
|
|
}
|
|
sessions = append(sessions, s)
|
|
}
|
|
return sessions, rows.Err()
|
|
}
|
|
|
|
// TouchHeartbeat updates last_heartbeat for all sessions of a workstation.
|
|
func (r *EditSessionRepository) TouchHeartbeat(ctx context.Context, workstationID string) error {
|
|
_, err := r.db.pool.Exec(ctx, `
|
|
UPDATE edit_sessions SET last_heartbeat = now() WHERE workstation_id = $1
|
|
`, workstationID)
|
|
return err
|
|
}
|
|
|
|
// ExpireStale deletes sessions whose last_heartbeat is older than the given
|
|
// timeout, returning the expired sessions for SSE notification.
|
|
func (r *EditSessionRepository) ExpireStale(ctx context.Context, timeout time.Duration) ([]EditSession, error) {
|
|
rows, err := r.db.pool.Query(ctx, `
|
|
DELETE FROM edit_sessions
|
|
WHERE last_heartbeat < now() - $1::interval
|
|
RETURNING id, item_id, user_id, workstation_id, context_level, object_id, dependency_cone, acquired_at, last_heartbeat
|
|
`, timeout.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var sessions []EditSession
|
|
for rows.Next() {
|
|
var s EditSession
|
|
if err := rows.Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID,
|
|
&s.ContextLevel, &s.ObjectID, &s.DependencyCone,
|
|
&s.AcquiredAt, &s.LastHeartbeat); err != nil {
|
|
return nil, err
|
|
}
|
|
sessions = append(sessions, s)
|
|
}
|
|
return sessions, rows.Err()
|
|
}
|
|
|
|
// GetConflict returns the existing session holding a given (item, context_level, object_id)
|
|
// slot, for building 409 conflict responses.
|
|
func (r *EditSessionRepository) GetConflict(ctx context.Context, itemID, contextLevel string, objectID *string) (*EditSession, error) {
|
|
s := &EditSession{}
|
|
var query string
|
|
var args []any
|
|
|
|
if objectID != nil {
|
|
query = `
|
|
SELECT id, item_id, user_id, workstation_id, context_level, object_id,
|
|
dependency_cone, acquired_at, last_heartbeat
|
|
FROM edit_sessions
|
|
WHERE item_id = $1 AND context_level = $2 AND object_id = $3
|
|
`
|
|
args = []any{itemID, contextLevel, *objectID}
|
|
} else {
|
|
query = `
|
|
SELECT id, item_id, user_id, workstation_id, context_level, object_id,
|
|
dependency_cone, acquired_at, last_heartbeat
|
|
FROM edit_sessions
|
|
WHERE item_id = $1 AND context_level = $2 AND object_id IS NULL
|
|
`
|
|
args = []any{itemID, contextLevel}
|
|
}
|
|
|
|
err := r.db.pool.QueryRow(ctx, query, args...).Scan(
|
|
&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID,
|
|
&s.ContextLevel, &s.ObjectID, &s.DependencyCone,
|
|
&s.AcquiredAt, &s.LastHeartbeat)
|
|
|
|
if err == pgx.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s, nil
|
|
}
|