Add server-side solver service module with REST API endpoints, database
schema, job definitions, and runner result caching.
New files:
- migrations/021_solver_results.sql: solver_results table with upsert constraint
- internal/db/solver_results.go: SolverResultRepository (Upsert, GetByItem, GetByItemRevision)
- internal/api/solver_handlers.go: solver API handlers and maybeCacheSolverResult hook
- jobdefs/assembly-solve.yaml: manual solve job definition
- jobdefs/assembly-validate.yaml: auto-validate on revision creation
- jobdefs/assembly-kinematic.yaml: manual kinematic simulation job
Modified:
- internal/config/config.go: SolverConfig struct with max_context_size_mb, default_timeout
- internal/modules/modules.go, loader.go: register solver module (depends on jobs)
- internal/db/jobs.go: ListSolverJobs helper with definition_name prefix filter
- internal/api/handlers.go: wire SolverResultRepository into Server
- internal/api/routes.go: /api/solver/* routes + /api/items/{partNumber}/solver/results
- internal/api/runner_handlers.go: async result cache hook on job completion
API endpoints:
- POST /api/solver/jobs — submit solver job (editor)
- GET /api/solver/jobs — list solver jobs with filters
- GET /api/solver/jobs/{id} — get solver job status
- POST /api/solver/jobs/{id}/cancel — cancel solver job (editor)
- GET /api/solver/solvers — registry of available solvers
- GET /api/items/{pn}/solver/results — cached results for item
Also fixes pre-existing test compilation errors (missing workflows param
in NewServer calls across 6 test files).
809 lines
25 KiB
Go
809 lines
25 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
// Runner represents a registered compute worker.
|
|
type Runner struct {
|
|
ID string
|
|
Name string
|
|
TokenHash string
|
|
TokenPrefix string
|
|
Tags []string
|
|
Status string
|
|
LastHeartbeat *time.Time
|
|
LastJobID *string
|
|
Metadata map[string]any
|
|
CreatedAt time.Time
|
|
UpdatedAt time.Time
|
|
}
|
|
|
|
// JobDefinitionRecord is a job definition stored in the database.
|
|
type JobDefinitionRecord struct {
|
|
ID string
|
|
Name string
|
|
Version int
|
|
TriggerType string
|
|
ScopeType string
|
|
ComputeType string
|
|
RunnerTags []string
|
|
TimeoutSeconds int
|
|
MaxRetries int
|
|
Priority int
|
|
Definition map[string]any
|
|
Enabled bool
|
|
CreatedAt time.Time
|
|
UpdatedAt time.Time
|
|
}
|
|
|
|
// Job represents a single compute job instance.
|
|
type Job struct {
|
|
ID string
|
|
JobDefinitionID *string
|
|
DefinitionName string
|
|
Status string
|
|
Priority int
|
|
ItemID *string
|
|
ProjectID *string
|
|
ScopeMetadata map[string]any
|
|
RunnerID *string
|
|
RunnerTags []string
|
|
CreatedAt time.Time
|
|
ClaimedAt *time.Time
|
|
StartedAt *time.Time
|
|
CompletedAt *time.Time
|
|
TimeoutSeconds int
|
|
ExpiresAt *time.Time
|
|
Progress int
|
|
ProgressMessage *string
|
|
Result map[string]any
|
|
ErrorMessage *string
|
|
RetryCount int
|
|
MaxRetries int
|
|
CreatedBy *string
|
|
CancelledBy *string
|
|
}
|
|
|
|
// JobLogEntry is a single log line for a job.
|
|
type JobLogEntry struct {
|
|
ID string
|
|
JobID string
|
|
Timestamp time.Time
|
|
Level string
|
|
Message string
|
|
Metadata map[string]any
|
|
}
|
|
|
|
// JobRepository provides job and runner database operations.
|
|
type JobRepository struct {
|
|
db *DB
|
|
}
|
|
|
|
// NewJobRepository creates a new job repository.
|
|
func NewJobRepository(db *DB) *JobRepository {
|
|
return &JobRepository{db: db}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Job Definitions
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// UpsertDefinition inserts or updates a job definition record.
|
|
func (r *JobRepository) UpsertDefinition(ctx context.Context, d *JobDefinitionRecord) error {
|
|
defJSON, err := json.Marshal(d.Definition)
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling definition: %w", err)
|
|
}
|
|
|
|
err = r.db.pool.QueryRow(ctx, `
|
|
INSERT INTO job_definitions (name, version, trigger_type, scope_type, compute_type,
|
|
runner_tags, timeout_seconds, max_retries, priority,
|
|
definition, enabled)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
|
ON CONFLICT (name) DO UPDATE SET
|
|
version = EXCLUDED.version,
|
|
trigger_type = EXCLUDED.trigger_type,
|
|
scope_type = EXCLUDED.scope_type,
|
|
compute_type = EXCLUDED.compute_type,
|
|
runner_tags = EXCLUDED.runner_tags,
|
|
timeout_seconds = EXCLUDED.timeout_seconds,
|
|
max_retries = EXCLUDED.max_retries,
|
|
priority = EXCLUDED.priority,
|
|
definition = EXCLUDED.definition,
|
|
enabled = EXCLUDED.enabled,
|
|
updated_at = now()
|
|
RETURNING id, created_at, updated_at
|
|
`, d.Name, d.Version, d.TriggerType, d.ScopeType, d.ComputeType,
|
|
d.RunnerTags, d.TimeoutSeconds, d.MaxRetries, d.Priority,
|
|
defJSON, d.Enabled,
|
|
).Scan(&d.ID, &d.CreatedAt, &d.UpdatedAt)
|
|
if err != nil {
|
|
return fmt.Errorf("upserting job definition: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetDefinition returns a job definition by name.
|
|
func (r *JobRepository) GetDefinition(ctx context.Context, name string) (*JobDefinitionRecord, error) {
|
|
d := &JobDefinitionRecord{}
|
|
var defJSON []byte
|
|
err := r.db.pool.QueryRow(ctx, `
|
|
SELECT id, name, version, trigger_type, scope_type, compute_type,
|
|
runner_tags, timeout_seconds, max_retries, priority,
|
|
definition, enabled, created_at, updated_at
|
|
FROM job_definitions WHERE name = $1
|
|
`, name).Scan(
|
|
&d.ID, &d.Name, &d.Version, &d.TriggerType, &d.ScopeType, &d.ComputeType,
|
|
&d.RunnerTags, &d.TimeoutSeconds, &d.MaxRetries, &d.Priority,
|
|
&defJSON, &d.Enabled, &d.CreatedAt, &d.UpdatedAt,
|
|
)
|
|
if err == pgx.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying job definition: %w", err)
|
|
}
|
|
if defJSON != nil {
|
|
if err := json.Unmarshal(defJSON, &d.Definition); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling definition: %w", err)
|
|
}
|
|
}
|
|
return d, nil
|
|
}
|
|
|
|
// ListDefinitions returns all job definitions.
|
|
func (r *JobRepository) ListDefinitions(ctx context.Context) ([]*JobDefinitionRecord, error) {
|
|
rows, err := r.db.pool.Query(ctx, `
|
|
SELECT id, name, version, trigger_type, scope_type, compute_type,
|
|
runner_tags, timeout_seconds, max_retries, priority,
|
|
definition, enabled, created_at, updated_at
|
|
FROM job_definitions ORDER BY name
|
|
`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying job definitions: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
return scanJobDefinitions(rows)
|
|
}
|
|
|
|
// GetDefinitionsByTrigger returns all enabled definitions matching a trigger type.
|
|
func (r *JobRepository) GetDefinitionsByTrigger(ctx context.Context, triggerType string) ([]*JobDefinitionRecord, error) {
|
|
rows, err := r.db.pool.Query(ctx, `
|
|
SELECT id, name, version, trigger_type, scope_type, compute_type,
|
|
runner_tags, timeout_seconds, max_retries, priority,
|
|
definition, enabled, created_at, updated_at
|
|
FROM job_definitions
|
|
WHERE trigger_type = $1 AND enabled = true
|
|
ORDER BY priority ASC, name
|
|
`, triggerType)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying definitions by trigger: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
return scanJobDefinitions(rows)
|
|
}
|
|
|
|
// GetDefinitionByID returns a job definition by ID.
|
|
func (r *JobRepository) GetDefinitionByID(ctx context.Context, id string) (*JobDefinitionRecord, error) {
|
|
d := &JobDefinitionRecord{}
|
|
var defJSON []byte
|
|
err := r.db.pool.QueryRow(ctx, `
|
|
SELECT id, name, version, trigger_type, scope_type, compute_type,
|
|
runner_tags, timeout_seconds, max_retries, priority,
|
|
definition, enabled, created_at, updated_at
|
|
FROM job_definitions WHERE id = $1
|
|
`, id).Scan(
|
|
&d.ID, &d.Name, &d.Version, &d.TriggerType, &d.ScopeType, &d.ComputeType,
|
|
&d.RunnerTags, &d.TimeoutSeconds, &d.MaxRetries, &d.Priority,
|
|
&defJSON, &d.Enabled, &d.CreatedAt, &d.UpdatedAt,
|
|
)
|
|
if err == pgx.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying job definition by ID: %w", err)
|
|
}
|
|
if defJSON != nil {
|
|
if err := json.Unmarshal(defJSON, &d.Definition); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling definition: %w", err)
|
|
}
|
|
}
|
|
return d, nil
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Jobs
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// CreateJob inserts a new job.
|
|
func (r *JobRepository) CreateJob(ctx context.Context, j *Job) error {
|
|
scopeJSON, err := json.Marshal(j.ScopeMetadata)
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling scope metadata: %w", err)
|
|
}
|
|
|
|
err = r.db.pool.QueryRow(ctx, `
|
|
INSERT INTO jobs (job_definition_id, definition_name, status, priority,
|
|
item_id, project_id, scope_metadata,
|
|
runner_tags, timeout_seconds, max_retries, created_by)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
|
RETURNING id, created_at
|
|
`, j.JobDefinitionID, j.DefinitionName, "pending", j.Priority,
|
|
j.ItemID, j.ProjectID, scopeJSON,
|
|
j.RunnerTags, j.TimeoutSeconds, j.MaxRetries, j.CreatedBy,
|
|
).Scan(&j.ID, &j.CreatedAt)
|
|
if err != nil {
|
|
return fmt.Errorf("creating job: %w", err)
|
|
}
|
|
j.Status = "pending"
|
|
return nil
|
|
}
|
|
|
|
// GetJob returns a job by ID.
|
|
func (r *JobRepository) GetJob(ctx context.Context, jobID string) (*Job, error) {
|
|
j := &Job{}
|
|
var scopeJSON, resultJSON []byte
|
|
err := r.db.pool.QueryRow(ctx, `
|
|
SELECT id, job_definition_id, definition_name, status, priority,
|
|
item_id, project_id, scope_metadata, runner_id, runner_tags,
|
|
created_at, claimed_at, started_at, completed_at,
|
|
timeout_seconds, expires_at, progress, progress_message,
|
|
result, error_message, retry_count, max_retries,
|
|
created_by, cancelled_by
|
|
FROM jobs WHERE id = $1
|
|
`, jobID).Scan(
|
|
&j.ID, &j.JobDefinitionID, &j.DefinitionName, &j.Status, &j.Priority,
|
|
&j.ItemID, &j.ProjectID, &scopeJSON, &j.RunnerID, &j.RunnerTags,
|
|
&j.CreatedAt, &j.ClaimedAt, &j.StartedAt, &j.CompletedAt,
|
|
&j.TimeoutSeconds, &j.ExpiresAt, &j.Progress, &j.ProgressMessage,
|
|
&resultJSON, &j.ErrorMessage, &j.RetryCount, &j.MaxRetries,
|
|
&j.CreatedBy, &j.CancelledBy,
|
|
)
|
|
if err == pgx.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying job: %w", err)
|
|
}
|
|
if scopeJSON != nil {
|
|
if err := json.Unmarshal(scopeJSON, &j.ScopeMetadata); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling scope metadata: %w", err)
|
|
}
|
|
}
|
|
if resultJSON != nil {
|
|
if err := json.Unmarshal(resultJSON, &j.Result); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling result: %w", err)
|
|
}
|
|
}
|
|
return j, nil
|
|
}
|
|
|
|
// ListJobs returns jobs matching optional filters.
|
|
func (r *JobRepository) ListJobs(ctx context.Context, status, itemID string, limit, offset int) ([]*Job, error) {
|
|
query := `
|
|
SELECT id, job_definition_id, definition_name, status, priority,
|
|
item_id, project_id, scope_metadata, runner_id, runner_tags,
|
|
created_at, claimed_at, started_at, completed_at,
|
|
timeout_seconds, expires_at, progress, progress_message,
|
|
result, error_message, retry_count, max_retries,
|
|
created_by, cancelled_by
|
|
FROM jobs WHERE 1=1`
|
|
args := []any{}
|
|
argN := 1
|
|
|
|
if status != "" {
|
|
query += fmt.Sprintf(" AND status = $%d", argN)
|
|
args = append(args, status)
|
|
argN++
|
|
}
|
|
if itemID != "" {
|
|
query += fmt.Sprintf(" AND item_id = $%d", argN)
|
|
args = append(args, itemID)
|
|
argN++
|
|
}
|
|
|
|
query += " ORDER BY created_at DESC"
|
|
|
|
if limit > 0 {
|
|
query += fmt.Sprintf(" LIMIT $%d", argN)
|
|
args = append(args, limit)
|
|
argN++
|
|
}
|
|
if offset > 0 {
|
|
query += fmt.Sprintf(" OFFSET $%d", argN)
|
|
args = append(args, offset)
|
|
}
|
|
|
|
rows, err := r.db.pool.Query(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying jobs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
return scanJobs(rows)
|
|
}
|
|
|
|
// ListSolverJobs returns solver jobs (definition_name LIKE 'assembly-%') with optional filters.
|
|
func (r *JobRepository) ListSolverJobs(ctx context.Context, status, itemID, operation string, limit, offset int) ([]*Job, error) {
|
|
query := `
|
|
SELECT id, job_definition_id, definition_name, status, priority,
|
|
item_id, project_id, scope_metadata, runner_id, runner_tags,
|
|
created_at, claimed_at, started_at, completed_at,
|
|
timeout_seconds, expires_at, progress, progress_message,
|
|
result, error_message, retry_count, max_retries,
|
|
created_by, cancelled_by
|
|
FROM jobs WHERE definition_name LIKE 'assembly-%'`
|
|
args := []any{}
|
|
argN := 1
|
|
|
|
if status != "" {
|
|
query += fmt.Sprintf(" AND status = $%d", argN)
|
|
args = append(args, status)
|
|
argN++
|
|
}
|
|
if itemID != "" {
|
|
query += fmt.Sprintf(" AND item_id = $%d", argN)
|
|
args = append(args, itemID)
|
|
argN++
|
|
}
|
|
if operation != "" {
|
|
query += fmt.Sprintf(" AND scope_metadata->>'operation' = $%d", argN)
|
|
args = append(args, operation)
|
|
argN++
|
|
}
|
|
|
|
query += " ORDER BY created_at DESC"
|
|
|
|
if limit > 0 {
|
|
query += fmt.Sprintf(" LIMIT $%d", argN)
|
|
args = append(args, limit)
|
|
argN++
|
|
}
|
|
if offset > 0 {
|
|
query += fmt.Sprintf(" OFFSET $%d", argN)
|
|
args = append(args, offset)
|
|
}
|
|
|
|
rows, err := r.db.pool.Query(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying solver jobs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
return scanJobs(rows)
|
|
}
|
|
|
|
// ClaimJob atomically claims the next available job matching the runner's tags.
|
|
// Uses SELECT FOR UPDATE SKIP LOCKED for exactly-once delivery.
|
|
func (r *JobRepository) ClaimJob(ctx context.Context, runnerID string, tags []string) (*Job, error) {
|
|
j := &Job{}
|
|
var scopeJSON, resultJSON []byte
|
|
err := r.db.pool.QueryRow(ctx, `
|
|
WITH claimable AS (
|
|
SELECT id FROM jobs
|
|
WHERE status = 'pending' AND runner_tags <@ $2::text[]
|
|
ORDER BY priority ASC, created_at ASC
|
|
LIMIT 1
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
UPDATE jobs SET
|
|
status = 'claimed',
|
|
runner_id = $1,
|
|
claimed_at = now(),
|
|
expires_at = now() + (timeout_seconds || ' seconds')::interval
|
|
FROM claimable
|
|
WHERE jobs.id = claimable.id
|
|
RETURNING jobs.id, jobs.job_definition_id, jobs.definition_name, jobs.status,
|
|
jobs.priority, jobs.item_id, jobs.project_id, jobs.scope_metadata,
|
|
jobs.runner_id, jobs.runner_tags, jobs.created_at, jobs.claimed_at,
|
|
jobs.started_at, jobs.completed_at, jobs.timeout_seconds, jobs.expires_at,
|
|
jobs.progress, jobs.progress_message, jobs.result, jobs.error_message,
|
|
jobs.retry_count, jobs.max_retries, jobs.created_by, jobs.cancelled_by
|
|
`, runnerID, tags).Scan(
|
|
&j.ID, &j.JobDefinitionID, &j.DefinitionName, &j.Status,
|
|
&j.Priority, &j.ItemID, &j.ProjectID, &scopeJSON,
|
|
&j.RunnerID, &j.RunnerTags, &j.CreatedAt, &j.ClaimedAt,
|
|
&j.StartedAt, &j.CompletedAt, &j.TimeoutSeconds, &j.ExpiresAt,
|
|
&j.Progress, &j.ProgressMessage, &resultJSON, &j.ErrorMessage,
|
|
&j.RetryCount, &j.MaxRetries, &j.CreatedBy, &j.CancelledBy,
|
|
)
|
|
if err == pgx.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("claiming job: %w", err)
|
|
}
|
|
if scopeJSON != nil {
|
|
if err := json.Unmarshal(scopeJSON, &j.ScopeMetadata); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling scope metadata: %w", err)
|
|
}
|
|
}
|
|
if resultJSON != nil {
|
|
if err := json.Unmarshal(resultJSON, &j.Result); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling result: %w", err)
|
|
}
|
|
}
|
|
return j, nil
|
|
}
|
|
|
|
// StartJob transitions a claimed job to running.
|
|
func (r *JobRepository) StartJob(ctx context.Context, jobID, runnerID string) error {
|
|
result, err := r.db.pool.Exec(ctx, `
|
|
UPDATE jobs SET status = 'running', started_at = now()
|
|
WHERE id = $1 AND runner_id = $2 AND status = 'claimed'
|
|
`, jobID, runnerID)
|
|
if err != nil {
|
|
return fmt.Errorf("starting job: %w", err)
|
|
}
|
|
if result.RowsAffected() == 0 {
|
|
return fmt.Errorf("job %s not claimable by runner %s or not in claimed state", jobID, runnerID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateProgress updates a running job's progress.
|
|
func (r *JobRepository) UpdateProgress(ctx context.Context, jobID, runnerID string, progress int, message string) error {
|
|
var msg *string
|
|
if message != "" {
|
|
msg = &message
|
|
}
|
|
result, err := r.db.pool.Exec(ctx, `
|
|
UPDATE jobs SET progress = $3, progress_message = $4
|
|
WHERE id = $1 AND runner_id = $2 AND status IN ('claimed', 'running')
|
|
`, jobID, runnerID, progress, msg)
|
|
if err != nil {
|
|
return fmt.Errorf("updating progress: %w", err)
|
|
}
|
|
if result.RowsAffected() == 0 {
|
|
return fmt.Errorf("job %s not owned by runner %s or not active", jobID, runnerID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CompleteJob marks a job as completed with optional result data.
|
|
func (r *JobRepository) CompleteJob(ctx context.Context, jobID, runnerID string, resultData map[string]any) error {
|
|
var resultJSON []byte
|
|
var err error
|
|
if resultData != nil {
|
|
resultJSON, err = json.Marshal(resultData)
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling result: %w", err)
|
|
}
|
|
}
|
|
|
|
res, err := r.db.pool.Exec(ctx, `
|
|
UPDATE jobs SET
|
|
status = 'completed',
|
|
progress = 100,
|
|
result = $3,
|
|
completed_at = now()
|
|
WHERE id = $1 AND runner_id = $2 AND status IN ('claimed', 'running')
|
|
`, jobID, runnerID, resultJSON)
|
|
if err != nil {
|
|
return fmt.Errorf("completing job: %w", err)
|
|
}
|
|
if res.RowsAffected() == 0 {
|
|
return fmt.Errorf("job %s not owned by runner %s or not active", jobID, runnerID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// FailJob marks a job as failed with an error message.
|
|
func (r *JobRepository) FailJob(ctx context.Context, jobID, runnerID string, errMsg string) error {
|
|
res, err := r.db.pool.Exec(ctx, `
|
|
UPDATE jobs SET
|
|
status = 'failed',
|
|
error_message = $3,
|
|
completed_at = now()
|
|
WHERE id = $1 AND runner_id = $2 AND status IN ('claimed', 'running')
|
|
`, jobID, runnerID, errMsg)
|
|
if err != nil {
|
|
return fmt.Errorf("failing job: %w", err)
|
|
}
|
|
if res.RowsAffected() == 0 {
|
|
return fmt.Errorf("job %s not owned by runner %s or not active", jobID, runnerID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CancelJob cancels a pending or active job.
|
|
func (r *JobRepository) CancelJob(ctx context.Context, jobID string, cancelledBy string) error {
|
|
res, err := r.db.pool.Exec(ctx, `
|
|
UPDATE jobs SET
|
|
status = 'cancelled',
|
|
cancelled_by = $2,
|
|
completed_at = now()
|
|
WHERE id = $1 AND status IN ('pending', 'claimed', 'running')
|
|
`, jobID, cancelledBy)
|
|
if err != nil {
|
|
return fmt.Errorf("cancelling job: %w", err)
|
|
}
|
|
if res.RowsAffected() == 0 {
|
|
return fmt.Errorf("job %s not cancellable", jobID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TimeoutExpiredJobs marks expired claimed/running jobs as failed.
|
|
// Returns the number of jobs timed out.
|
|
func (r *JobRepository) TimeoutExpiredJobs(ctx context.Context) (int64, error) {
|
|
result, err := r.db.pool.Exec(ctx, `
|
|
UPDATE jobs SET
|
|
status = 'failed',
|
|
error_message = 'job timed out',
|
|
completed_at = now()
|
|
WHERE status IN ('claimed', 'running')
|
|
AND expires_at IS NOT NULL
|
|
AND expires_at < now()
|
|
`)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("timing out expired jobs: %w", err)
|
|
}
|
|
return result.RowsAffected(), nil
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Job Log
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// AppendLog adds a log entry to a job.
|
|
func (r *JobRepository) AppendLog(ctx context.Context, entry *JobLogEntry) error {
|
|
metaJSON, err := json.Marshal(entry.Metadata)
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling log metadata: %w", err)
|
|
}
|
|
|
|
err = r.db.pool.QueryRow(ctx, `
|
|
INSERT INTO job_log (job_id, level, message, metadata)
|
|
VALUES ($1, $2, $3, $4)
|
|
RETURNING id, timestamp
|
|
`, entry.JobID, entry.Level, entry.Message, metaJSON,
|
|
).Scan(&entry.ID, &entry.Timestamp)
|
|
if err != nil {
|
|
return fmt.Errorf("appending job log: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetJobLogs returns all log entries for a job.
|
|
func (r *JobRepository) GetJobLogs(ctx context.Context, jobID string) ([]*JobLogEntry, error) {
|
|
rows, err := r.db.pool.Query(ctx, `
|
|
SELECT id, job_id, timestamp, level, message, metadata
|
|
FROM job_log WHERE job_id = $1 ORDER BY timestamp ASC
|
|
`, jobID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying job logs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var entries []*JobLogEntry
|
|
for rows.Next() {
|
|
e := &JobLogEntry{}
|
|
var metaJSON []byte
|
|
if err := rows.Scan(&e.ID, &e.JobID, &e.Timestamp, &e.Level, &e.Message, &metaJSON); err != nil {
|
|
return nil, fmt.Errorf("scanning job log: %w", err)
|
|
}
|
|
if metaJSON != nil {
|
|
if err := json.Unmarshal(metaJSON, &e.Metadata); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling log metadata: %w", err)
|
|
}
|
|
}
|
|
entries = append(entries, e)
|
|
}
|
|
return entries, rows.Err()
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Runners
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// RegisterRunner creates a new runner record.
|
|
func (r *JobRepository) RegisterRunner(ctx context.Context, runner *Runner) error {
|
|
metaJSON, err := json.Marshal(runner.Metadata)
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling runner metadata: %w", err)
|
|
}
|
|
|
|
err = r.db.pool.QueryRow(ctx, `
|
|
INSERT INTO runners (name, token_hash, token_prefix, tags, status, metadata)
|
|
VALUES ($1, $2, $3, $4, 'offline', $5)
|
|
RETURNING id, created_at, updated_at
|
|
`, runner.Name, runner.TokenHash, runner.TokenPrefix, runner.Tags, metaJSON,
|
|
).Scan(&runner.ID, &runner.CreatedAt, &runner.UpdatedAt)
|
|
if err != nil {
|
|
return fmt.Errorf("registering runner: %w", err)
|
|
}
|
|
runner.Status = "offline"
|
|
return nil
|
|
}
|
|
|
|
// GetRunnerByToken looks up a runner by token hash.
|
|
func (r *JobRepository) GetRunnerByToken(ctx context.Context, tokenHash string) (*Runner, error) {
|
|
runner := &Runner{}
|
|
var metaJSON []byte
|
|
err := r.db.pool.QueryRow(ctx, `
|
|
SELECT id, name, token_hash, token_prefix, tags, status,
|
|
last_heartbeat, last_job_id, metadata, created_at, updated_at
|
|
FROM runners WHERE token_hash = $1
|
|
`, tokenHash).Scan(
|
|
&runner.ID, &runner.Name, &runner.TokenHash, &runner.TokenPrefix,
|
|
&runner.Tags, &runner.Status, &runner.LastHeartbeat, &runner.LastJobID,
|
|
&metaJSON, &runner.CreatedAt, &runner.UpdatedAt,
|
|
)
|
|
if err == pgx.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying runner by token: %w", err)
|
|
}
|
|
if metaJSON != nil {
|
|
if err := json.Unmarshal(metaJSON, &runner.Metadata); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling runner metadata: %w", err)
|
|
}
|
|
}
|
|
return runner, nil
|
|
}
|
|
|
|
// GetRunner returns a runner by ID.
|
|
func (r *JobRepository) GetRunner(ctx context.Context, runnerID string) (*Runner, error) {
|
|
runner := &Runner{}
|
|
var metaJSON []byte
|
|
err := r.db.pool.QueryRow(ctx, `
|
|
SELECT id, name, token_hash, token_prefix, tags, status,
|
|
last_heartbeat, last_job_id, metadata, created_at, updated_at
|
|
FROM runners WHERE id = $1
|
|
`, runnerID).Scan(
|
|
&runner.ID, &runner.Name, &runner.TokenHash, &runner.TokenPrefix,
|
|
&runner.Tags, &runner.Status, &runner.LastHeartbeat, &runner.LastJobID,
|
|
&metaJSON, &runner.CreatedAt, &runner.UpdatedAt,
|
|
)
|
|
if err == pgx.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying runner: %w", err)
|
|
}
|
|
if metaJSON != nil {
|
|
if err := json.Unmarshal(metaJSON, &runner.Metadata); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling runner metadata: %w", err)
|
|
}
|
|
}
|
|
return runner, nil
|
|
}
|
|
|
|
// Heartbeat updates a runner's heartbeat timestamp and sets status to online.
|
|
func (r *JobRepository) Heartbeat(ctx context.Context, runnerID string) error {
|
|
res, err := r.db.pool.Exec(ctx, `
|
|
UPDATE runners SET
|
|
status = 'online',
|
|
last_heartbeat = now(),
|
|
updated_at = now()
|
|
WHERE id = $1
|
|
`, runnerID)
|
|
if err != nil {
|
|
return fmt.Errorf("updating heartbeat: %w", err)
|
|
}
|
|
if res.RowsAffected() == 0 {
|
|
return fmt.Errorf("runner %s not found", runnerID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ListRunners returns all registered runners.
|
|
func (r *JobRepository) ListRunners(ctx context.Context) ([]*Runner, error) {
|
|
rows, err := r.db.pool.Query(ctx, `
|
|
SELECT id, name, token_hash, token_prefix, tags, status,
|
|
last_heartbeat, last_job_id, metadata, created_at, updated_at
|
|
FROM runners ORDER BY name
|
|
`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying runners: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var runners []*Runner
|
|
for rows.Next() {
|
|
runner := &Runner{}
|
|
var metaJSON []byte
|
|
if err := rows.Scan(
|
|
&runner.ID, &runner.Name, &runner.TokenHash, &runner.TokenPrefix,
|
|
&runner.Tags, &runner.Status, &runner.LastHeartbeat, &runner.LastJobID,
|
|
&metaJSON, &runner.CreatedAt, &runner.UpdatedAt,
|
|
); err != nil {
|
|
return nil, fmt.Errorf("scanning runner: %w", err)
|
|
}
|
|
if metaJSON != nil {
|
|
if err := json.Unmarshal(metaJSON, &runner.Metadata); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling runner metadata: %w", err)
|
|
}
|
|
}
|
|
runners = append(runners, runner)
|
|
}
|
|
return runners, rows.Err()
|
|
}
|
|
|
|
// DeleteRunner removes a runner by ID.
|
|
func (r *JobRepository) DeleteRunner(ctx context.Context, runnerID string) error {
|
|
res, err := r.db.pool.Exec(ctx, `DELETE FROM runners WHERE id = $1`, runnerID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting runner: %w", err)
|
|
}
|
|
if res.RowsAffected() == 0 {
|
|
return fmt.Errorf("runner %s not found", runnerID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ExpireStaleRunners marks runners with no recent heartbeat as offline.
|
|
func (r *JobRepository) ExpireStaleRunners(ctx context.Context, timeout time.Duration) (int64, error) {
|
|
result, err := r.db.pool.Exec(ctx, `
|
|
UPDATE runners SET status = 'offline', updated_at = now()
|
|
WHERE status = 'online'
|
|
AND last_heartbeat < now() - $1::interval
|
|
`, timeout.String())
|
|
if err != nil {
|
|
return 0, fmt.Errorf("expiring stale runners: %w", err)
|
|
}
|
|
return result.RowsAffected(), nil
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func scanJobDefinitions(rows pgx.Rows) ([]*JobDefinitionRecord, error) {
|
|
var defs []*JobDefinitionRecord
|
|
for rows.Next() {
|
|
d := &JobDefinitionRecord{}
|
|
var defJSON []byte
|
|
if err := rows.Scan(
|
|
&d.ID, &d.Name, &d.Version, &d.TriggerType, &d.ScopeType, &d.ComputeType,
|
|
&d.RunnerTags, &d.TimeoutSeconds, &d.MaxRetries, &d.Priority,
|
|
&defJSON, &d.Enabled, &d.CreatedAt, &d.UpdatedAt,
|
|
); err != nil {
|
|
return nil, fmt.Errorf("scanning job definition: %w", err)
|
|
}
|
|
if defJSON != nil {
|
|
if err := json.Unmarshal(defJSON, &d.Definition); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling definition: %w", err)
|
|
}
|
|
}
|
|
defs = append(defs, d)
|
|
}
|
|
return defs, rows.Err()
|
|
}
|
|
|
|
func scanJobs(rows pgx.Rows) ([]*Job, error) {
|
|
var jobs []*Job
|
|
for rows.Next() {
|
|
j := &Job{}
|
|
var scopeJSON, resultJSON []byte
|
|
if err := rows.Scan(
|
|
&j.ID, &j.JobDefinitionID, &j.DefinitionName, &j.Status, &j.Priority,
|
|
&j.ItemID, &j.ProjectID, &scopeJSON, &j.RunnerID, &j.RunnerTags,
|
|
&j.CreatedAt, &j.ClaimedAt, &j.StartedAt, &j.CompletedAt,
|
|
&j.TimeoutSeconds, &j.ExpiresAt, &j.Progress, &j.ProgressMessage,
|
|
&resultJSON, &j.ErrorMessage, &j.RetryCount, &j.MaxRetries,
|
|
&j.CreatedBy, &j.CancelledBy,
|
|
); err != nil {
|
|
return nil, fmt.Errorf("scanning job: %w", err)
|
|
}
|
|
if scopeJSON != nil {
|
|
if err := json.Unmarshal(scopeJSON, &j.ScopeMetadata); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling scope metadata: %w", err)
|
|
}
|
|
}
|
|
if resultJSON != nil {
|
|
if err := json.Unmarshal(resultJSON, &j.Result); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling result: %w", err)
|
|
}
|
|
}
|
|
jobs = append(jobs, j)
|
|
}
|
|
return jobs, rows.Err()
|
|
}
|