Files
silo/internal/api/broker_test.go
Forbes e7da3ee94d feat(sse): per-connection filtering with user and workstation context
- Extend sseClient with userID, workstationID, and item filter set
- Update Subscribe() to accept userID and workstationID params
- Add WatchItem/UnwatchItem/IsWatchingItem methods on sseClient
- Add PublishToItem, PublishToWorkstation, PublishToUser targeted delivery
- Targeted events get IDs but skip history ring buffer (real-time only)
- Update HandleEvents to pass auth user ID and workstation_id query param
- Touch workstation last_seen on SSE connect
- Existing Publish() broadcast unchanged; all current callers unaffected
- Add 5 new tests for targeted delivery and item watch lifecycle

Closes #162
2026-03-01 10:04:01 -06:00

273 lines
6.4 KiB
Go

package api
import (
"testing"
"time"
"github.com/rs/zerolog"
)
func TestBrokerSubscribeUnsubscribe(t *testing.T) {
b := NewBroker(zerolog.Nop())
c := b.Subscribe("", "")
if b.ClientCount() != 1 {
t.Fatalf("expected 1 client, got %d", b.ClientCount())
}
b.Unsubscribe(c)
if b.ClientCount() != 0 {
t.Fatalf("expected 0 clients, got %d", b.ClientCount())
}
}
func TestBrokerPublish(t *testing.T) {
b := NewBroker(zerolog.Nop())
c := b.Subscribe("", "")
defer b.Unsubscribe(c)
b.Publish("item.created", `{"part_number":"F01-0001"}`)
select {
case ev := <-c.ch:
if ev.Type != "item.created" {
t.Fatalf("expected type item.created, got %s", ev.Type)
}
if ev.ID != 1 {
t.Fatalf("expected ID 1, got %d", ev.ID)
}
if ev.Data != `{"part_number":"F01-0001"}` {
t.Fatalf("unexpected data: %s", ev.Data)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for event")
}
}
func TestBrokerPublishDropsSlow(t *testing.T) {
b := NewBroker(zerolog.Nop())
c := b.Subscribe("", "")
defer b.Unsubscribe(c)
// Fill the client's channel
for i := 0; i < clientChanSize+10; i++ {
b.Publish("heartbeat", "{}")
}
// Should have clientChanSize events buffered, rest dropped
count := len(c.ch)
if count != clientChanSize {
t.Fatalf("expected %d buffered events, got %d", clientChanSize, count)
}
}
func TestBrokerEventsSince(t *testing.T) {
b := NewBroker(zerolog.Nop())
b.Publish("item.created", `{"pn":"A"}`)
b.Publish("item.updated", `{"pn":"B"}`)
b.Publish("item.deleted", `{"pn":"C"}`)
events := b.EventsSince(1) // after ID 1
if len(events) != 2 {
t.Fatalf("expected 2 events, got %d", len(events))
}
if events[0].Type != "item.updated" {
t.Fatalf("expected item.updated, got %s", events[0].Type)
}
if events[1].Type != "item.deleted" {
t.Fatalf("expected item.deleted, got %s", events[1].Type)
}
// No events after the latest
events = b.EventsSince(3)
if len(events) != 0 {
t.Fatalf("expected 0 events, got %d", len(events))
}
}
func TestBrokerClientCount(t *testing.T) {
b := NewBroker(zerolog.Nop())
c1 := b.Subscribe("", "")
c2 := b.Subscribe("", "")
c3 := b.Subscribe("", "")
if b.ClientCount() != 3 {
t.Fatalf("expected 3 clients, got %d", b.ClientCount())
}
b.Unsubscribe(c2)
if b.ClientCount() != 2 {
t.Fatalf("expected 2 clients, got %d", b.ClientCount())
}
b.Unsubscribe(c1)
b.Unsubscribe(c3)
if b.ClientCount() != 0 {
t.Fatalf("expected 0 clients, got %d", b.ClientCount())
}
}
func TestBrokerShutdown(t *testing.T) {
b := NewBroker(zerolog.Nop())
c := b.Subscribe("", "")
b.Shutdown()
// Client's closed channel should be closed
select {
case <-c.closed:
// expected
case <-time.After(time.Second):
t.Fatal("client closed channel not closed after shutdown")
}
if b.ClientCount() != 0 {
t.Fatalf("expected 0 clients after shutdown, got %d", b.ClientCount())
}
}
func TestBrokerMonotonicIDs(t *testing.T) {
b := NewBroker(zerolog.Nop())
b.Publish("a", "{}")
b.Publish("b", "{}")
b.Publish("c", "{}")
events := b.EventsSince(0)
if len(events) != 3 {
t.Fatalf("expected 3 events, got %d", len(events))
}
for i := 1; i < len(events); i++ {
if events[i].ID <= events[i-1].ID {
t.Fatalf("event IDs not monotonic: %d <= %d", events[i].ID, events[i-1].ID)
}
}
}
func TestWatchUnwatchItem(t *testing.T) {
b := NewBroker(zerolog.Nop())
c := b.Subscribe("user1", "ws1")
defer b.Unsubscribe(c)
if c.IsWatchingItem("item-abc") {
t.Fatal("should not be watching item-abc before WatchItem")
}
c.WatchItem("item-abc")
if !c.IsWatchingItem("item-abc") {
t.Fatal("should be watching item-abc after WatchItem")
}
c.UnwatchItem("item-abc")
if c.IsWatchingItem("item-abc") {
t.Fatal("should not be watching item-abc after UnwatchItem")
}
}
func TestPublishToItem(t *testing.T) {
b := NewBroker(zerolog.Nop())
watcher := b.Subscribe("user1", "ws1")
defer b.Unsubscribe(watcher)
bystander := b.Subscribe("user2", "ws2")
defer b.Unsubscribe(bystander)
watcher.WatchItem("item-abc")
b.PublishToItem("item-abc", "edit.started", `{"item_id":"item-abc"}`)
// Watcher should receive the event.
select {
case ev := <-watcher.ch:
if ev.Type != "edit.started" {
t.Fatalf("expected edit.started, got %s", ev.Type)
}
case <-time.After(time.Second):
t.Fatal("watcher did not receive targeted event")
}
// Bystander should not.
select {
case ev := <-bystander.ch:
t.Fatalf("bystander should not receive targeted event, got %s", ev.Type)
case <-time.After(50 * time.Millisecond):
// expected
}
}
func TestPublishToWorkstation(t *testing.T) {
b := NewBroker(zerolog.Nop())
target := b.Subscribe("user1", "ws-target")
defer b.Unsubscribe(target)
other := b.Subscribe("user1", "ws-other")
defer b.Unsubscribe(other)
b.PublishToWorkstation("ws-target", "sync.update", `{"data":"x"}`)
select {
case ev := <-target.ch:
if ev.Type != "sync.update" {
t.Fatalf("expected sync.update, got %s", ev.Type)
}
case <-time.After(time.Second):
t.Fatal("target workstation did not receive event")
}
select {
case ev := <-other.ch:
t.Fatalf("other workstation should not receive event, got %s", ev.Type)
case <-time.After(50 * time.Millisecond):
// expected
}
}
func TestPublishToUser(t *testing.T) {
b := NewBroker(zerolog.Nop())
c1 := b.Subscribe("user1", "ws1")
defer b.Unsubscribe(c1)
c2 := b.Subscribe("user1", "ws2")
defer b.Unsubscribe(c2)
c3 := b.Subscribe("user2", "ws3")
defer b.Unsubscribe(c3)
b.PublishToUser("user1", "user.notify", `{"msg":"hello"}`)
// Both user1 connections should receive.
for _, c := range []*sseClient{c1, c2} {
select {
case ev := <-c.ch:
if ev.Type != "user.notify" {
t.Fatalf("expected user.notify, got %s", ev.Type)
}
case <-time.After(time.Second):
t.Fatal("user1 client did not receive event")
}
}
// user2 should not.
select {
case ev := <-c3.ch:
t.Fatalf("user2 should not receive event, got %s", ev.Type)
case <-time.After(50 * time.Millisecond):
// expected
}
}
func TestTargetedEventsNotInHistory(t *testing.T) {
b := NewBroker(zerolog.Nop())
c := b.Subscribe("user1", "ws1")
defer b.Unsubscribe(c)
c.WatchItem("item-abc")
b.Publish("broadcast", `{}`)
b.PublishToItem("item-abc", "targeted", `{}`)
events := b.EventsSince(0)
if len(events) != 1 {
t.Fatalf("expected 1 event in history (broadcast only), got %d", len(events))
}
if events[0].Type != "broadcast" {
t.Fatalf("expected broadcast event in history, got %s", events[0].Type)
}
}