feat(sse): per-connection filtering with user and workstation context #162

Closed
opened 2026-03-01 15:36:31 +00:00 by forbes · 0 comments
Owner

Context

Sub-issue of #125 (Context-Aware Part Subscription System).

The current SSE broker (internal/api/broker.go) is a pure fan-out: every connected client receives every event. For edit sessions and subscriptions we need:

  1. Identity on connections — know which user and workstation is on each SSE connection
  2. Item-scoped delivery — edit session events only go to clients watching that item
  3. Workstation-scoped delivery — subscription sync events only go to the relevant workstation

Current Broker

type sseClient struct {
    ch     chan Event
    closed chan struct{}
}

No identity. Publish() fans out to all clients. ~130 lines total.

Changes

1. Extend sseClient with context

type sseClient struct {
    ch            chan Event
    closed        chan struct{}
    userID        string            // set on subscribe
    workstationID string            // optional, set via query param
    itemFilters   map[string]bool   // item IDs this client is watching
    mu            sync.RWMutex      // guards itemFilters
}

2. Extend Subscribe() signature

func (b *Broker) Subscribe(userID, workstationID string) *sseClient

The SSE handler (HandleEvents) extracts userID from auth context and workstationID from ?workstation_id= query param. Calls Touch() on the workstation to update last_seen.

3. Add item subscription methods

func (c *sseClient) WatchItem(itemID string)
func (c *sseClient) UnwatchItem(itemID string)

Called by edit session handlers when a session is acquired/released. Could also be called explicitly by clients via a future endpoint.

4. Add targeted publish methods

// PublishToItem sends an event only to clients watching a specific item.
func (b *Broker) PublishToItem(itemID, eventType, data string)

// PublishToWorkstation sends an event only to the specific workstation.
func (b *Broker) PublishToWorkstation(workstationID, eventType, data string)

// PublishToUser sends an event to all connections for a user.
func (b *Broker) PublishToUser(userID, eventType, data string)

The existing Publish() (broadcast) remains for global events like heartbeat, server.state, item.created, etc.

5. Update HandleEvents

In internal/api/sse_handler.go:

func (s *Server) HandleEvents(w http.ResponseWriter, r *http.Request) {
    user := auth.UserFromContext(r.Context())
    wsID := r.URL.Query().Get("workstation_id")

    client := s.broker.Subscribe(user.ID, wsID)
    defer s.broker.Unsubscribe(client)

    // If workstation provided, touch last_seen
    if wsID != "" {
        go s.workstations.Touch(r.Context(), wsID)
    }
    // ... rest unchanged
}

6. History replay filtering

EventsSince() currently replays all events. For targeted events, either:

  • Store the target scope (itemID/workstationID) in the Event struct and filter on replay
  • Or only replay broadcast events (simpler, acceptable since edit session events are ephemeral)

Recommend: only replay broadcast events. Targeted events are real-time only.

Files to Modify

  • internal/api/broker.go — extend sseClient, add targeted publish methods
  • internal/api/sse_handler.go — pass identity on subscribe

Acceptance Criteria

  • SSE connections carry userID and optional workstationID
  • Publish() (broadcast) still works for all existing events
  • PublishToItem() delivers only to clients with that item in their filter
  • PublishToWorkstation() delivers only to matching workstation
  • PublishToUser() delivers to all connections for a user
  • Existing SSE consumers (web UI) unaffected (no workstation_id param = broadcast only)
  • Workstation last_seen updated on SSE connect

Depends On

  • #161 (workstations table)

Part Of

#125

## Context Sub-issue of #125 (Context-Aware Part Subscription System). The current SSE broker (`internal/api/broker.go`) is a pure fan-out: every connected client receives every event. For edit sessions and subscriptions we need: 1. **Identity on connections** — know which user and workstation is on each SSE connection 2. **Item-scoped delivery** — edit session events only go to clients watching that item 3. **Workstation-scoped delivery** — subscription sync events only go to the relevant workstation ## Current Broker ```go type sseClient struct { ch chan Event closed chan struct{} } ``` No identity. `Publish()` fans out to all clients. ~130 lines total. ## Changes ### 1. Extend `sseClient` with context ```go type sseClient struct { ch chan Event closed chan struct{} userID string // set on subscribe workstationID string // optional, set via query param itemFilters map[string]bool // item IDs this client is watching mu sync.RWMutex // guards itemFilters } ``` ### 2. Extend `Subscribe()` signature ```go func (b *Broker) Subscribe(userID, workstationID string) *sseClient ``` The SSE handler (`HandleEvents`) extracts userID from auth context and workstationID from `?workstation_id=` query param. Calls `Touch()` on the workstation to update `last_seen`. ### 3. Add item subscription methods ```go func (c *sseClient) WatchItem(itemID string) func (c *sseClient) UnwatchItem(itemID string) ``` Called by edit session handlers when a session is acquired/released. Could also be called explicitly by clients via a future endpoint. ### 4. Add targeted publish methods ```go // PublishToItem sends an event only to clients watching a specific item. func (b *Broker) PublishToItem(itemID, eventType, data string) // PublishToWorkstation sends an event only to the specific workstation. func (b *Broker) PublishToWorkstation(workstationID, eventType, data string) // PublishToUser sends an event to all connections for a user. func (b *Broker) PublishToUser(userID, eventType, data string) ``` The existing `Publish()` (broadcast) remains for global events like `heartbeat`, `server.state`, `item.created`, etc. ### 5. Update `HandleEvents` In `internal/api/sse_handler.go`: ```go func (s *Server) HandleEvents(w http.ResponseWriter, r *http.Request) { user := auth.UserFromContext(r.Context()) wsID := r.URL.Query().Get("workstation_id") client := s.broker.Subscribe(user.ID, wsID) defer s.broker.Unsubscribe(client) // If workstation provided, touch last_seen if wsID != "" { go s.workstations.Touch(r.Context(), wsID) } // ... rest unchanged } ``` ### 6. History replay filtering `EventsSince()` currently replays all events. For targeted events, either: - Store the target scope (itemID/workstationID) in the `Event` struct and filter on replay - Or only replay broadcast events (simpler, acceptable since edit session events are ephemeral) Recommend: only replay broadcast events. Targeted events are real-time only. ## Files to Modify - `internal/api/broker.go` — extend sseClient, add targeted publish methods - `internal/api/sse_handler.go` — pass identity on subscribe ## Acceptance Criteria - [x] SSE connections carry userID and optional workstationID - [ ] `Publish()` (broadcast) still works for all existing events - [ ] `PublishToItem()` delivers only to clients with that item in their filter - [ ] `PublishToWorkstation()` delivers only to matching workstation - [ ] `PublishToUser()` delivers to all connections for a user - [ ] Existing SSE consumers (web UI) unaffected (no workstation_id param = broadcast only) - [ ] Workstation `last_seen` updated on SSE connect ## Depends On - #161 (workstations table) ## Part Of #125
Sign in to join this conversation.
No Label
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: kindred/silo#162