diff --git a/internal/api/handlers.go b/internal/api/handlers.go index a9331de..759bab4 100644 --- a/internal/api/handlers.go +++ b/internal/api/handlers.go @@ -63,6 +63,7 @@ type Server struct { workflows map[string]*workflow.Workflow solverResults *db.SolverResultRepository workstations *db.WorkstationRepository + editSessions *db.EditSessionRepository } // NewServer creates a new API server. @@ -98,6 +99,7 @@ func NewServer( itemApprovals := db.NewItemApprovalRepository(database) solverResults := db.NewSolverResultRepository(database) workstations := db.NewWorkstationRepository(database) + editSessions := db.NewEditSessionRepository(database) seqStore := &dbSequenceStore{db: database, schemas: schemas} partgen := partnum.NewGenerator(schemas, seqStore) @@ -133,6 +135,7 @@ func NewServer( workflows: workflows, solverResults: solverResults, workstations: workstations, + editSessions: editSessions, } } diff --git a/internal/api/routes.go b/internal/api/routes.go index b2bc8b7..98f32aa 100644 --- a/internal/api/routes.go +++ b/internal/api/routes.go @@ -79,6 +79,12 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { r.Delete("/{id}", server.HandleDeleteWorkstation) }) + // Edit sessions — current user's active sessions (gated by sessions module) + r.Route("/edit-sessions", func(r chi.Router) { + r.Use(server.RequireModule("sessions")) + r.Get("/", server.HandleListUserEditSessions) + }) + // Auth endpoints r.Get("/auth/me", server.HandleGetCurrentUser) r.Route("/auth/tokens", func(r chi.Router) { @@ -206,6 +212,19 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { }) }) + // Edit sessions (gated by sessions module) + r.Route("/edit-sessions", func(r chi.Router) { + r.Use(server.RequireModule("sessions")) + r.Get("/", server.HandleListItemEditSessions) + + r.Group(func(r chi.Router) { + r.Use(server.RequireWritable) + r.Use(server.RequireRole(auth.RoleEditor)) + r.Post("/", server.HandleAcquireEditSession) + r.Delete("/{sessionID}", server.HandleReleaseEditSession) + }) + }) + r.Group(func(r chi.Router) { r.Use(server.RequireWritable) r.Use(server.RequireRole(auth.RoleEditor)) diff --git a/internal/api/session_handlers.go b/internal/api/session_handlers.go new file mode 100644 index 0000000..63bab7f --- /dev/null +++ b/internal/api/session_handlers.go @@ -0,0 +1,293 @@ +package api + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/kindredsystems/silo/internal/auth" + "github.com/kindredsystems/silo/internal/db" + "github.com/kindredsystems/silo/internal/modules" +) + +var validContextLevels = map[string]bool{ + "sketch": true, + "partdesign": true, + "assembly": true, +} + +type editSessionResponse struct { + ID string `json:"id"` + ItemID string `json:"item_id"` + PartNumber string `json:"part_number,omitempty"` + UserID string `json:"user_id"` + WorkstationID string `json:"workstation_id"` + ContextLevel string `json:"context_level"` + ObjectID *string `json:"object_id"` + DependCone []string `json:"dependency_cone"` + AcquiredAt string `json:"acquired_at"` + LastHeartbeat string `json:"last_heartbeat"` +} + +func sessionToResponse(s *db.EditSession, partNumber string) editSessionResponse { + cone := s.DependencyCone + if cone == nil { + cone = []string{} + } + return editSessionResponse{ + ID: s.ID, + ItemID: s.ItemID, + PartNumber: partNumber, + UserID: s.UserID, + WorkstationID: s.WorkstationID, + ContextLevel: s.ContextLevel, + ObjectID: s.ObjectID, + DependCone: cone, + AcquiredAt: s.AcquiredAt.UTC().Format("2006-01-02T15:04:05Z"), + LastHeartbeat: s.LastHeartbeat.UTC().Format("2006-01-02T15:04:05Z"), + } +} + +// HandleAcquireEditSession acquires an edit session on an item. +// POST /api/items/{partNumber}/edit-sessions +func (s *Server) HandleAcquireEditSession(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + partNumber := chi.URLParam(r, "partNumber") + + item, err := s.items.GetByPartNumber(ctx, partNumber) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get item") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get item") + return + } + if item == nil { + writeError(w, http.StatusNotFound, "not_found", "Item not found") + return + } + + user := auth.UserFromContext(ctx) + if user == nil { + writeError(w, http.StatusUnauthorized, "unauthorized", "Authentication required") + return + } + + var req struct { + WorkstationID string `json:"workstation_id"` + ContextLevel string `json:"context_level"` + ObjectID *string `json:"object_id"` + DependencyCone []string `json:"dependency_cone"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_json", err.Error()) + return + } + if req.WorkstationID == "" { + writeError(w, http.StatusBadRequest, "validation_error", "workstation_id is required") + return + } + if !validContextLevels[req.ContextLevel] { + writeError(w, http.StatusBadRequest, "validation_error", "context_level must be sketch, partdesign, or assembly") + return + } + + // If no dependency cone provided and DAG module is enabled, attempt to compute it. + depCone := req.DependencyCone + if len(depCone) == 0 && req.ObjectID != nil && s.modules.IsEnabled(modules.DAG) { + node, nodeErr := s.dag.GetNodeByKey(ctx, item.ID, item.CurrentRevision, *req.ObjectID) + if nodeErr == nil && node != nil { + coneNodes, coneErr := s.dag.GetForwardCone(ctx, node.ID) + if coneErr == nil { + depCone = make([]string, len(coneNodes)) + for i, n := range coneNodes { + depCone[i] = n.NodeKey + } + } + } + } + + session := &db.EditSession{ + ItemID: item.ID, + UserID: user.ID, + WorkstationID: req.WorkstationID, + ContextLevel: req.ContextLevel, + ObjectID: req.ObjectID, + DependencyCone: depCone, + } + + if err := s.editSessions.Acquire(ctx, session); err != nil { + // Check for unique constraint violation (hard interference). + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) && pgErr.Code == "23505" { + s.writeConflictResponse(w, r, item.ID, req.ContextLevel, req.ObjectID) + return + } + s.logger.Error().Err(err).Msg("failed to acquire edit session") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to acquire edit session") + return + } + + s.broker.PublishToItem(item.ID, "edit.session_acquired", mustMarshal(map[string]any{ + "session_id": session.ID, + "item_id": item.ID, + "part_number": partNumber, + "user": user.Username, + "workstation": req.WorkstationID, + "context_level": session.ContextLevel, + "object_id": session.ObjectID, + })) + + writeJSON(w, http.StatusOK, sessionToResponse(session, partNumber)) +} + +// writeConflictResponse builds a 409 response with holder info. +func (s *Server) writeConflictResponse(w http.ResponseWriter, r *http.Request, itemID, contextLevel string, objectID *string) { + ctx := r.Context() + conflict, err := s.editSessions.GetConflict(ctx, itemID, contextLevel, objectID) + if err != nil || conflict == nil { + writeError(w, http.StatusConflict, "hard_interference", "Another user is editing this object") + return + } + + // Look up holder's username and workstation name. + holderUser := "unknown" + if u, err := s.auth.GetUserByID(ctx, conflict.UserID); err == nil && u != nil { + holderUser = u.Username + } + holderWS := conflict.WorkstationID + if ws, err := s.workstations.GetByID(ctx, conflict.WorkstationID); err == nil && ws != nil { + holderWS = ws.Name + } + + objDesc := contextLevel + if objectID != nil { + objDesc = *objectID + } + + writeJSON(w, http.StatusConflict, map[string]any{ + "error": "hard_interference", + "holder": map[string]any{ + "user": holderUser, + "workstation": holderWS, + "context_level": conflict.ContextLevel, + "object_id": conflict.ObjectID, + "acquired_at": conflict.AcquiredAt.UTC().Format("2006-01-02T15:04:05Z"), + }, + "message": fmt.Sprintf("%s is currently editing %s", holderUser, objDesc), + }) +} + +// HandleReleaseEditSession releases an edit session. +// DELETE /api/items/{partNumber}/edit-sessions/{sessionID} +func (s *Server) HandleReleaseEditSession(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + partNumber := chi.URLParam(r, "partNumber") + sessionID := chi.URLParam(r, "sessionID") + + item, err := s.items.GetByPartNumber(ctx, partNumber) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get item") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get item") + return + } + if item == nil { + writeError(w, http.StatusNotFound, "not_found", "Item not found") + return + } + + user := auth.UserFromContext(ctx) + if user == nil { + writeError(w, http.StatusUnauthorized, "unauthorized", "Authentication required") + return + } + + session, err := s.editSessions.GetByID(ctx, sessionID) + if err != nil { + s.logger.Error().Err(err).Str("session_id", sessionID).Msg("failed to get edit session") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get edit session") + return + } + if session == nil { + writeError(w, http.StatusNotFound, "not_found", "Edit session not found") + return + } + + if session.UserID != user.ID && user.Role != auth.RoleAdmin { + writeError(w, http.StatusForbidden, "forbidden", "You can only release your own edit sessions") + return + } + + if err := s.editSessions.Release(ctx, sessionID); err != nil { + s.logger.Error().Err(err).Str("session_id", sessionID).Msg("failed to release edit session") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to release edit session") + return + } + + s.broker.PublishToItem(item.ID, "edit.session_released", mustMarshal(map[string]any{ + "session_id": session.ID, + "item_id": item.ID, + "part_number": partNumber, + "user": user.Username, + "context_level": session.ContextLevel, + "object_id": session.ObjectID, + })) + + w.WriteHeader(http.StatusNoContent) +} + +// HandleListItemEditSessions lists active edit sessions for an item. +// GET /api/items/{partNumber}/edit-sessions +func (s *Server) HandleListItemEditSessions(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + partNumber := chi.URLParam(r, "partNumber") + + item, err := s.items.GetByPartNumber(ctx, partNumber) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get item") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get item") + return + } + if item == nil { + writeError(w, http.StatusNotFound, "not_found", "Item not found") + return + } + + sessions, err := s.editSessions.ListForItem(ctx, item.ID) + if err != nil { + s.logger.Error().Err(err).Msg("failed to list edit sessions") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to list edit sessions") + return + } + + out := make([]editSessionResponse, len(sessions)) + for i, sess := range sessions { + out[i] = sessionToResponse(sess, partNumber) + } + writeJSON(w, http.StatusOK, out) +} + +// HandleListUserEditSessions lists active edit sessions for the current user. +// GET /api/edit-sessions +func (s *Server) HandleListUserEditSessions(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + user := auth.UserFromContext(ctx) + if user == nil { + writeError(w, http.StatusUnauthorized, "unauthorized", "Authentication required") + return + } + + sessions, err := s.editSessions.ListForUser(ctx, user.ID) + if err != nil { + s.logger.Error().Err(err).Msg("failed to list edit sessions") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to list edit sessions") + return + } + + out := make([]editSessionResponse, len(sessions)) + for i, sess := range sessions { + out[i] = sessionToResponse(sess, "") + } + writeJSON(w, http.StatusOK, out) +} diff --git a/internal/db/edit_sessions.go b/internal/db/edit_sessions.go new file mode 100644 index 0000000..90ba408 --- /dev/null +++ b/internal/db/edit_sessions.go @@ -0,0 +1,222 @@ +package db + +import ( + "context" + "time" + + "github.com/jackc/pgx/v5" +) + +// EditSession represents an active editing context. +type EditSession struct { + ID string + ItemID string + UserID string + WorkstationID string + ContextLevel string + ObjectID *string + DependencyCone []string + AcquiredAt time.Time + LastHeartbeat time.Time +} + +// EditSessionRepository provides edit session database operations. +type EditSessionRepository struct { + db *DB +} + +// NewEditSessionRepository creates a new edit session repository. +func NewEditSessionRepository(db *DB) *EditSessionRepository { + return &EditSessionRepository{db: db} +} + +// Acquire inserts a new edit session. Returns a unique constraint error +// if another session already holds the same (item_id, context_level, object_id). +func (r *EditSessionRepository) Acquire(ctx context.Context, s *EditSession) error { + return r.db.pool.QueryRow(ctx, ` + INSERT INTO edit_sessions (item_id, user_id, workstation_id, context_level, object_id, dependency_cone) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING id, acquired_at, last_heartbeat + `, s.ItemID, s.UserID, s.WorkstationID, s.ContextLevel, s.ObjectID, s.DependencyCone). + Scan(&s.ID, &s.AcquiredAt, &s.LastHeartbeat) +} + +// Release deletes an edit session by ID. +func (r *EditSessionRepository) Release(ctx context.Context, id string) error { + _, err := r.db.pool.Exec(ctx, `DELETE FROM edit_sessions WHERE id = $1`, id) + return err +} + +// ReleaseForWorkstation deletes all sessions for a workstation, returning +// the released sessions so callers can publish SSE notifications. +func (r *EditSessionRepository) ReleaseForWorkstation(ctx context.Context, workstationID string) ([]EditSession, error) { + rows, err := r.db.pool.Query(ctx, ` + DELETE FROM edit_sessions + WHERE workstation_id = $1 + RETURNING id, item_id, user_id, workstation_id, context_level, object_id, dependency_cone, acquired_at, last_heartbeat + `, workstationID) + if err != nil { + return nil, err + } + defer rows.Close() + + var sessions []EditSession + for rows.Next() { + var s EditSession + if err := rows.Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID, + &s.ContextLevel, &s.ObjectID, &s.DependencyCone, + &s.AcquiredAt, &s.LastHeartbeat); err != nil { + return nil, err + } + sessions = append(sessions, s) + } + return sessions, rows.Err() +} + +// GetByID returns an edit session by its ID. +func (r *EditSessionRepository) GetByID(ctx context.Context, id string) (*EditSession, error) { + s := &EditSession{} + err := r.db.pool.QueryRow(ctx, ` + SELECT id, item_id, user_id, workstation_id, context_level, object_id, + dependency_cone, acquired_at, last_heartbeat + FROM edit_sessions + WHERE id = $1 + `, id).Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID, + &s.ContextLevel, &s.ObjectID, &s.DependencyCone, + &s.AcquiredAt, &s.LastHeartbeat) + + if err == pgx.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + return s, nil +} + +// ListForItem returns all active edit sessions for an item. +func (r *EditSessionRepository) ListForItem(ctx context.Context, itemID string) ([]*EditSession, error) { + rows, err := r.db.pool.Query(ctx, ` + SELECT id, item_id, user_id, workstation_id, context_level, object_id, + dependency_cone, acquired_at, last_heartbeat + FROM edit_sessions + WHERE item_id = $1 + ORDER BY acquired_at + `, itemID) + if err != nil { + return nil, err + } + defer rows.Close() + + var sessions []*EditSession + for rows.Next() { + s := &EditSession{} + if err := rows.Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID, + &s.ContextLevel, &s.ObjectID, &s.DependencyCone, + &s.AcquiredAt, &s.LastHeartbeat); err != nil { + return nil, err + } + sessions = append(sessions, s) + } + return sessions, rows.Err() +} + +// ListForUser returns all active edit sessions for a user. +func (r *EditSessionRepository) ListForUser(ctx context.Context, userID string) ([]*EditSession, error) { + rows, err := r.db.pool.Query(ctx, ` + SELECT id, item_id, user_id, workstation_id, context_level, object_id, + dependency_cone, acquired_at, last_heartbeat + FROM edit_sessions + WHERE user_id = $1 + ORDER BY acquired_at + `, userID) + if err != nil { + return nil, err + } + defer rows.Close() + + var sessions []*EditSession + for rows.Next() { + s := &EditSession{} + if err := rows.Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID, + &s.ContextLevel, &s.ObjectID, &s.DependencyCone, + &s.AcquiredAt, &s.LastHeartbeat); err != nil { + return nil, err + } + sessions = append(sessions, s) + } + return sessions, rows.Err() +} + +// TouchHeartbeat updates last_heartbeat for all sessions of a workstation. +func (r *EditSessionRepository) TouchHeartbeat(ctx context.Context, workstationID string) error { + _, err := r.db.pool.Exec(ctx, ` + UPDATE edit_sessions SET last_heartbeat = now() WHERE workstation_id = $1 + `, workstationID) + return err +} + +// ExpireStale deletes sessions whose last_heartbeat is older than the given +// timeout, returning the expired sessions for SSE notification. +func (r *EditSessionRepository) ExpireStale(ctx context.Context, timeout time.Duration) ([]EditSession, error) { + rows, err := r.db.pool.Query(ctx, ` + DELETE FROM edit_sessions + WHERE last_heartbeat < now() - $1::interval + RETURNING id, item_id, user_id, workstation_id, context_level, object_id, dependency_cone, acquired_at, last_heartbeat + `, timeout.String()) + if err != nil { + return nil, err + } + defer rows.Close() + + var sessions []EditSession + for rows.Next() { + var s EditSession + if err := rows.Scan(&s.ID, &s.ItemID, &s.UserID, &s.WorkstationID, + &s.ContextLevel, &s.ObjectID, &s.DependencyCone, + &s.AcquiredAt, &s.LastHeartbeat); err != nil { + return nil, err + } + sessions = append(sessions, s) + } + return sessions, rows.Err() +} + +// GetConflict returns the existing session holding a given (item, context_level, object_id) +// slot, for building 409 conflict responses. +func (r *EditSessionRepository) GetConflict(ctx context.Context, itemID, contextLevel string, objectID *string) (*EditSession, error) { + s := &EditSession{} + var query string + var args []any + + if objectID != nil { + query = ` + SELECT id, item_id, user_id, workstation_id, context_level, object_id, + dependency_cone, acquired_at, last_heartbeat + FROM edit_sessions + WHERE item_id = $1 AND context_level = $2 AND object_id = $3 + ` + args = []any{itemID, contextLevel, *objectID} + } else { + query = ` + SELECT id, item_id, user_id, workstation_id, context_level, object_id, + dependency_cone, acquired_at, last_heartbeat + FROM edit_sessions + WHERE item_id = $1 AND context_level = $2 AND object_id IS NULL + ` + args = []any{itemID, contextLevel} + } + + err := r.db.pool.QueryRow(ctx, query, args...).Scan( + &s.ID, &s.ItemID, &s.UserID, &s.WorkstationID, + &s.ContextLevel, &s.ObjectID, &s.DependencyCone, + &s.AcquiredAt, &s.LastHeartbeat) + + if err == pgx.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + return s, nil +} diff --git a/internal/db/migrations/023_edit_sessions.sql b/internal/db/migrations/023_edit_sessions.sql new file mode 100644 index 0000000..0fe8614 --- /dev/null +++ b/internal/db/migrations/023_edit_sessions.sql @@ -0,0 +1,17 @@ +-- 023_edit_sessions.sql — active editing context tracking + +CREATE TABLE edit_sessions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + item_id UUID NOT NULL REFERENCES items(id) ON DELETE CASCADE, + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + workstation_id UUID NOT NULL REFERENCES workstations(id) ON DELETE CASCADE, + context_level TEXT NOT NULL CHECK (context_level IN ('sketch', 'partdesign', 'assembly')), + object_id TEXT, + dependency_cone TEXT[], + acquired_at TIMESTAMPTZ NOT NULL DEFAULT now(), + last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX idx_edit_sessions_item ON edit_sessions(item_id); +CREATE INDEX idx_edit_sessions_user ON edit_sessions(user_id); +CREATE UNIQUE INDEX idx_edit_sessions_active ON edit_sessions(item_id, context_level, object_id);