Compare commits

..

1 Commits

Author SHA1 Message Date
Zoe Forbes
3d38e4b4c3 feat: handle DAG and job SSE events in SiloEventListener
New signals:
- dag_updated(part_number, node_count, edge_count)
- dag_validated(part_number, valid, failed_count)
- job_created/claimed/progress/completed/failed/cancelled

Dispatch logic parses payloads and emits typed signals for
downstream UI and logging consumers.

Closes kindred/create#218
2026-02-14 15:22:29 -06:00

View File

@@ -2338,6 +2338,18 @@ class SiloEventListener(QtCore.QThread):
) # (status, retry_count, error_message)
server_mode_changed = QtCore.Signal(str) # "normal" / "read-only" / "degraded"
# DAG events
dag_updated = QtCore.Signal(str, int, int) # part_number, node_count, edge_count
dag_validated = QtCore.Signal(str, bool, int) # part_number, valid, failed_count
# Job lifecycle events
job_created = QtCore.Signal(str, str, str) # job_id, definition_name, part_number
job_claimed = QtCore.Signal(str, str) # job_id, runner_id
job_progress = QtCore.Signal(str, int, str) # job_id, progress, message
job_completed = QtCore.Signal(str) # job_id
job_failed = QtCore.Signal(str, str) # job_id, error
job_cancelled = QtCore.Signal(str) # job_id
_MAX_RETRIES = 10
_BASE_DELAY = 1 # seconds, doubles each retry
_MAX_DELAY = 60 # seconds, backoff cap
@@ -2454,6 +2466,35 @@ class SiloEventListener(QtCore.QThread):
self.server_mode_changed.emit(mode)
return
# Job lifecycle events (keyed by job_id, not part_number)
job_id = payload.get("job_id", "")
if event_type == "job.created":
self.job_created.emit(
job_id,
payload.get("definition_name", ""),
payload.get("part_number", ""),
)
return
if event_type == "job.claimed":
self.job_claimed.emit(job_id, payload.get("runner_id", ""))
return
if event_type == "job.progress":
self.job_progress.emit(
job_id,
int(payload.get("progress", 0)),
payload.get("message", ""),
)
return
if event_type == "job.completed":
self.job_completed.emit(job_id)
return
if event_type == "job.failed":
self.job_failed.emit(job_id, payload.get("error", ""))
return
if event_type == "job.cancelled":
self.job_cancelled.emit(job_id)
return
pn = payload.get("part_number", "")
if not pn:
return
@@ -2463,6 +2504,18 @@ class SiloEventListener(QtCore.QThread):
elif event_type == "revision_created":
rev = payload.get("revision", 0)
self.revision_created.emit(pn, int(rev))
elif event_type == "dag.updated":
self.dag_updated.emit(
pn,
int(payload.get("node_count", 0)),
int(payload.get("edge_count", 0)),
)
elif event_type == "dag.validated":
self.dag_validated.emit(
pn,
bool(payload.get("valid", False)),
int(payload.get("failed_count", 0)),
)
class _SSEUnsupported(Exception):