diff --git a/internal/db/jobs.go b/internal/db/jobs.go new file mode 100644 index 0000000..ef36edf --- /dev/null +++ b/internal/db/jobs.go @@ -0,0 +1,759 @@ +package db + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/jackc/pgx/v5" +) + +// Runner represents a registered compute worker. +type Runner struct { + ID string + Name string + TokenHash string + TokenPrefix string + Tags []string + Status string + LastHeartbeat *time.Time + LastJobID *string + Metadata map[string]any + CreatedAt time.Time + UpdatedAt time.Time +} + +// JobDefinitionRecord is a job definition stored in the database. +type JobDefinitionRecord struct { + ID string + Name string + Version int + TriggerType string + ScopeType string + ComputeType string + RunnerTags []string + TimeoutSeconds int + MaxRetries int + Priority int + Definition map[string]any + Enabled bool + CreatedAt time.Time + UpdatedAt time.Time +} + +// Job represents a single compute job instance. +type Job struct { + ID string + JobDefinitionID *string + DefinitionName string + Status string + Priority int + ItemID *string + ProjectID *string + ScopeMetadata map[string]any + RunnerID *string + RunnerTags []string + CreatedAt time.Time + ClaimedAt *time.Time + StartedAt *time.Time + CompletedAt *time.Time + TimeoutSeconds int + ExpiresAt *time.Time + Progress int + ProgressMessage *string + Result map[string]any + ErrorMessage *string + RetryCount int + MaxRetries int + CreatedBy *string + CancelledBy *string +} + +// JobLogEntry is a single log line for a job. +type JobLogEntry struct { + ID string + JobID string + Timestamp time.Time + Level string + Message string + Metadata map[string]any +} + +// JobRepository provides job and runner database operations. +type JobRepository struct { + db *DB +} + +// NewJobRepository creates a new job repository. +func NewJobRepository(db *DB) *JobRepository { + return &JobRepository{db: db} +} + +// --------------------------------------------------------------------------- +// Job Definitions +// --------------------------------------------------------------------------- + +// UpsertDefinition inserts or updates a job definition record. +func (r *JobRepository) UpsertDefinition(ctx context.Context, d *JobDefinitionRecord) error { + defJSON, err := json.Marshal(d.Definition) + if err != nil { + return fmt.Errorf("marshaling definition: %w", err) + } + + err = r.db.pool.QueryRow(ctx, ` + INSERT INTO job_definitions (name, version, trigger_type, scope_type, compute_type, + runner_tags, timeout_seconds, max_retries, priority, + definition, enabled) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT (name) DO UPDATE SET + version = EXCLUDED.version, + trigger_type = EXCLUDED.trigger_type, + scope_type = EXCLUDED.scope_type, + compute_type = EXCLUDED.compute_type, + runner_tags = EXCLUDED.runner_tags, + timeout_seconds = EXCLUDED.timeout_seconds, + max_retries = EXCLUDED.max_retries, + priority = EXCLUDED.priority, + definition = EXCLUDED.definition, + enabled = EXCLUDED.enabled, + updated_at = now() + RETURNING id, created_at, updated_at + `, d.Name, d.Version, d.TriggerType, d.ScopeType, d.ComputeType, + d.RunnerTags, d.TimeoutSeconds, d.MaxRetries, d.Priority, + defJSON, d.Enabled, + ).Scan(&d.ID, &d.CreatedAt, &d.UpdatedAt) + if err != nil { + return fmt.Errorf("upserting job definition: %w", err) + } + return nil +} + +// GetDefinition returns a job definition by name. +func (r *JobRepository) GetDefinition(ctx context.Context, name string) (*JobDefinitionRecord, error) { + d := &JobDefinitionRecord{} + var defJSON []byte + err := r.db.pool.QueryRow(ctx, ` + SELECT id, name, version, trigger_type, scope_type, compute_type, + runner_tags, timeout_seconds, max_retries, priority, + definition, enabled, created_at, updated_at + FROM job_definitions WHERE name = $1 + `, name).Scan( + &d.ID, &d.Name, &d.Version, &d.TriggerType, &d.ScopeType, &d.ComputeType, + &d.RunnerTags, &d.TimeoutSeconds, &d.MaxRetries, &d.Priority, + &defJSON, &d.Enabled, &d.CreatedAt, &d.UpdatedAt, + ) + if err == pgx.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("querying job definition: %w", err) + } + if defJSON != nil { + if err := json.Unmarshal(defJSON, &d.Definition); err != nil { + return nil, fmt.Errorf("unmarshaling definition: %w", err) + } + } + return d, nil +} + +// ListDefinitions returns all job definitions. +func (r *JobRepository) ListDefinitions(ctx context.Context) ([]*JobDefinitionRecord, error) { + rows, err := r.db.pool.Query(ctx, ` + SELECT id, name, version, trigger_type, scope_type, compute_type, + runner_tags, timeout_seconds, max_retries, priority, + definition, enabled, created_at, updated_at + FROM job_definitions ORDER BY name + `) + if err != nil { + return nil, fmt.Errorf("querying job definitions: %w", err) + } + defer rows.Close() + return scanJobDefinitions(rows) +} + +// GetDefinitionsByTrigger returns all enabled definitions matching a trigger type. +func (r *JobRepository) GetDefinitionsByTrigger(ctx context.Context, triggerType string) ([]*JobDefinitionRecord, error) { + rows, err := r.db.pool.Query(ctx, ` + SELECT id, name, version, trigger_type, scope_type, compute_type, + runner_tags, timeout_seconds, max_retries, priority, + definition, enabled, created_at, updated_at + FROM job_definitions + WHERE trigger_type = $1 AND enabled = true + ORDER BY priority ASC, name + `, triggerType) + if err != nil { + return nil, fmt.Errorf("querying definitions by trigger: %w", err) + } + defer rows.Close() + return scanJobDefinitions(rows) +} + +// GetDefinitionByID returns a job definition by ID. +func (r *JobRepository) GetDefinitionByID(ctx context.Context, id string) (*JobDefinitionRecord, error) { + d := &JobDefinitionRecord{} + var defJSON []byte + err := r.db.pool.QueryRow(ctx, ` + SELECT id, name, version, trigger_type, scope_type, compute_type, + runner_tags, timeout_seconds, max_retries, priority, + definition, enabled, created_at, updated_at + FROM job_definitions WHERE id = $1 + `, id).Scan( + &d.ID, &d.Name, &d.Version, &d.TriggerType, &d.ScopeType, &d.ComputeType, + &d.RunnerTags, &d.TimeoutSeconds, &d.MaxRetries, &d.Priority, + &defJSON, &d.Enabled, &d.CreatedAt, &d.UpdatedAt, + ) + if err == pgx.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("querying job definition by ID: %w", err) + } + if defJSON != nil { + if err := json.Unmarshal(defJSON, &d.Definition); err != nil { + return nil, fmt.Errorf("unmarshaling definition: %w", err) + } + } + return d, nil +} + +// --------------------------------------------------------------------------- +// Jobs +// --------------------------------------------------------------------------- + +// CreateJob inserts a new job. +func (r *JobRepository) CreateJob(ctx context.Context, j *Job) error { + scopeJSON, err := json.Marshal(j.ScopeMetadata) + if err != nil { + return fmt.Errorf("marshaling scope metadata: %w", err) + } + + err = r.db.pool.QueryRow(ctx, ` + INSERT INTO jobs (job_definition_id, definition_name, status, priority, + item_id, project_id, scope_metadata, + runner_tags, timeout_seconds, max_retries, created_by) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + RETURNING id, created_at + `, j.JobDefinitionID, j.DefinitionName, "pending", j.Priority, + j.ItemID, j.ProjectID, scopeJSON, + j.RunnerTags, j.TimeoutSeconds, j.MaxRetries, j.CreatedBy, + ).Scan(&j.ID, &j.CreatedAt) + if err != nil { + return fmt.Errorf("creating job: %w", err) + } + j.Status = "pending" + return nil +} + +// GetJob returns a job by ID. +func (r *JobRepository) GetJob(ctx context.Context, jobID string) (*Job, error) { + j := &Job{} + var scopeJSON, resultJSON []byte + err := r.db.pool.QueryRow(ctx, ` + SELECT id, job_definition_id, definition_name, status, priority, + item_id, project_id, scope_metadata, runner_id, runner_tags, + created_at, claimed_at, started_at, completed_at, + timeout_seconds, expires_at, progress, progress_message, + result, error_message, retry_count, max_retries, + created_by, cancelled_by + FROM jobs WHERE id = $1 + `, jobID).Scan( + &j.ID, &j.JobDefinitionID, &j.DefinitionName, &j.Status, &j.Priority, + &j.ItemID, &j.ProjectID, &scopeJSON, &j.RunnerID, &j.RunnerTags, + &j.CreatedAt, &j.ClaimedAt, &j.StartedAt, &j.CompletedAt, + &j.TimeoutSeconds, &j.ExpiresAt, &j.Progress, &j.ProgressMessage, + &resultJSON, &j.ErrorMessage, &j.RetryCount, &j.MaxRetries, + &j.CreatedBy, &j.CancelledBy, + ) + if err == pgx.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("querying job: %w", err) + } + if scopeJSON != nil { + if err := json.Unmarshal(scopeJSON, &j.ScopeMetadata); err != nil { + return nil, fmt.Errorf("unmarshaling scope metadata: %w", err) + } + } + if resultJSON != nil { + if err := json.Unmarshal(resultJSON, &j.Result); err != nil { + return nil, fmt.Errorf("unmarshaling result: %w", err) + } + } + return j, nil +} + +// ListJobs returns jobs matching optional filters. +func (r *JobRepository) ListJobs(ctx context.Context, status, itemID string, limit, offset int) ([]*Job, error) { + query := ` + SELECT id, job_definition_id, definition_name, status, priority, + item_id, project_id, scope_metadata, runner_id, runner_tags, + created_at, claimed_at, started_at, completed_at, + timeout_seconds, expires_at, progress, progress_message, + result, error_message, retry_count, max_retries, + created_by, cancelled_by + FROM jobs WHERE 1=1` + args := []any{} + argN := 1 + + if status != "" { + query += fmt.Sprintf(" AND status = $%d", argN) + args = append(args, status) + argN++ + } + if itemID != "" { + query += fmt.Sprintf(" AND item_id = $%d", argN) + args = append(args, itemID) + argN++ + } + + query += " ORDER BY created_at DESC" + + if limit > 0 { + query += fmt.Sprintf(" LIMIT $%d", argN) + args = append(args, limit) + argN++ + } + if offset > 0 { + query += fmt.Sprintf(" OFFSET $%d", argN) + args = append(args, offset) + } + + rows, err := r.db.pool.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("querying jobs: %w", err) + } + defer rows.Close() + return scanJobs(rows) +} + +// ClaimJob atomically claims the next available job matching the runner's tags. +// Uses SELECT FOR UPDATE SKIP LOCKED for exactly-once delivery. +func (r *JobRepository) ClaimJob(ctx context.Context, runnerID string, tags []string) (*Job, error) { + j := &Job{} + var scopeJSON, resultJSON []byte + err := r.db.pool.QueryRow(ctx, ` + WITH claimable AS ( + SELECT id FROM jobs + WHERE status = 'pending' AND runner_tags <@ $2::text[] + ORDER BY priority ASC, created_at ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + UPDATE jobs SET + status = 'claimed', + runner_id = $1, + claimed_at = now(), + expires_at = now() + (timeout_seconds || ' seconds')::interval + FROM claimable + WHERE jobs.id = claimable.id + RETURNING jobs.id, jobs.job_definition_id, jobs.definition_name, jobs.status, + jobs.priority, jobs.item_id, jobs.project_id, jobs.scope_metadata, + jobs.runner_id, jobs.runner_tags, jobs.created_at, jobs.claimed_at, + jobs.started_at, jobs.completed_at, jobs.timeout_seconds, jobs.expires_at, + jobs.progress, jobs.progress_message, jobs.result, jobs.error_message, + jobs.retry_count, jobs.max_retries, jobs.created_by, jobs.cancelled_by + `, runnerID, tags).Scan( + &j.ID, &j.JobDefinitionID, &j.DefinitionName, &j.Status, + &j.Priority, &j.ItemID, &j.ProjectID, &scopeJSON, + &j.RunnerID, &j.RunnerTags, &j.CreatedAt, &j.ClaimedAt, + &j.StartedAt, &j.CompletedAt, &j.TimeoutSeconds, &j.ExpiresAt, + &j.Progress, &j.ProgressMessage, &resultJSON, &j.ErrorMessage, + &j.RetryCount, &j.MaxRetries, &j.CreatedBy, &j.CancelledBy, + ) + if err == pgx.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("claiming job: %w", err) + } + if scopeJSON != nil { + if err := json.Unmarshal(scopeJSON, &j.ScopeMetadata); err != nil { + return nil, fmt.Errorf("unmarshaling scope metadata: %w", err) + } + } + if resultJSON != nil { + if err := json.Unmarshal(resultJSON, &j.Result); err != nil { + return nil, fmt.Errorf("unmarshaling result: %w", err) + } + } + return j, nil +} + +// StartJob transitions a claimed job to running. +func (r *JobRepository) StartJob(ctx context.Context, jobID, runnerID string) error { + result, err := r.db.pool.Exec(ctx, ` + UPDATE jobs SET status = 'running', started_at = now() + WHERE id = $1 AND runner_id = $2 AND status = 'claimed' + `, jobID, runnerID) + if err != nil { + return fmt.Errorf("starting job: %w", err) + } + if result.RowsAffected() == 0 { + return fmt.Errorf("job %s not claimable by runner %s or not in claimed state", jobID, runnerID) + } + return nil +} + +// UpdateProgress updates a running job's progress. +func (r *JobRepository) UpdateProgress(ctx context.Context, jobID, runnerID string, progress int, message string) error { + var msg *string + if message != "" { + msg = &message + } + result, err := r.db.pool.Exec(ctx, ` + UPDATE jobs SET progress = $3, progress_message = $4 + WHERE id = $1 AND runner_id = $2 AND status IN ('claimed', 'running') + `, jobID, runnerID, progress, msg) + if err != nil { + return fmt.Errorf("updating progress: %w", err) + } + if result.RowsAffected() == 0 { + return fmt.Errorf("job %s not owned by runner %s or not active", jobID, runnerID) + } + return nil +} + +// CompleteJob marks a job as completed with optional result data. +func (r *JobRepository) CompleteJob(ctx context.Context, jobID, runnerID string, resultData map[string]any) error { + var resultJSON []byte + var err error + if resultData != nil { + resultJSON, err = json.Marshal(resultData) + if err != nil { + return fmt.Errorf("marshaling result: %w", err) + } + } + + res, err := r.db.pool.Exec(ctx, ` + UPDATE jobs SET + status = 'completed', + progress = 100, + result = $3, + completed_at = now() + WHERE id = $1 AND runner_id = $2 AND status IN ('claimed', 'running') + `, jobID, runnerID, resultJSON) + if err != nil { + return fmt.Errorf("completing job: %w", err) + } + if res.RowsAffected() == 0 { + return fmt.Errorf("job %s not owned by runner %s or not active", jobID, runnerID) + } + return nil +} + +// FailJob marks a job as failed with an error message. +func (r *JobRepository) FailJob(ctx context.Context, jobID, runnerID string, errMsg string) error { + res, err := r.db.pool.Exec(ctx, ` + UPDATE jobs SET + status = 'failed', + error_message = $3, + completed_at = now() + WHERE id = $1 AND runner_id = $2 AND status IN ('claimed', 'running') + `, jobID, runnerID, errMsg) + if err != nil { + return fmt.Errorf("failing job: %w", err) + } + if res.RowsAffected() == 0 { + return fmt.Errorf("job %s not owned by runner %s or not active", jobID, runnerID) + } + return nil +} + +// CancelJob cancels a pending or active job. +func (r *JobRepository) CancelJob(ctx context.Context, jobID string, cancelledBy string) error { + res, err := r.db.pool.Exec(ctx, ` + UPDATE jobs SET + status = 'cancelled', + cancelled_by = $2, + completed_at = now() + WHERE id = $1 AND status IN ('pending', 'claimed', 'running') + `, jobID, cancelledBy) + if err != nil { + return fmt.Errorf("cancelling job: %w", err) + } + if res.RowsAffected() == 0 { + return fmt.Errorf("job %s not cancellable", jobID) + } + return nil +} + +// TimeoutExpiredJobs marks expired claimed/running jobs as failed. +// Returns the number of jobs timed out. +func (r *JobRepository) TimeoutExpiredJobs(ctx context.Context) (int64, error) { + result, err := r.db.pool.Exec(ctx, ` + UPDATE jobs SET + status = 'failed', + error_message = 'job timed out', + completed_at = now() + WHERE status IN ('claimed', 'running') + AND expires_at IS NOT NULL + AND expires_at < now() + `) + if err != nil { + return 0, fmt.Errorf("timing out expired jobs: %w", err) + } + return result.RowsAffected(), nil +} + +// --------------------------------------------------------------------------- +// Job Log +// --------------------------------------------------------------------------- + +// AppendLog adds a log entry to a job. +func (r *JobRepository) AppendLog(ctx context.Context, entry *JobLogEntry) error { + metaJSON, err := json.Marshal(entry.Metadata) + if err != nil { + return fmt.Errorf("marshaling log metadata: %w", err) + } + + err = r.db.pool.QueryRow(ctx, ` + INSERT INTO job_log (job_id, level, message, metadata) + VALUES ($1, $2, $3, $4) + RETURNING id, timestamp + `, entry.JobID, entry.Level, entry.Message, metaJSON, + ).Scan(&entry.ID, &entry.Timestamp) + if err != nil { + return fmt.Errorf("appending job log: %w", err) + } + return nil +} + +// GetJobLogs returns all log entries for a job. +func (r *JobRepository) GetJobLogs(ctx context.Context, jobID string) ([]*JobLogEntry, error) { + rows, err := r.db.pool.Query(ctx, ` + SELECT id, job_id, timestamp, level, message, metadata + FROM job_log WHERE job_id = $1 ORDER BY timestamp ASC + `, jobID) + if err != nil { + return nil, fmt.Errorf("querying job logs: %w", err) + } + defer rows.Close() + + var entries []*JobLogEntry + for rows.Next() { + e := &JobLogEntry{} + var metaJSON []byte + if err := rows.Scan(&e.ID, &e.JobID, &e.Timestamp, &e.Level, &e.Message, &metaJSON); err != nil { + return nil, fmt.Errorf("scanning job log: %w", err) + } + if metaJSON != nil { + if err := json.Unmarshal(metaJSON, &e.Metadata); err != nil { + return nil, fmt.Errorf("unmarshaling log metadata: %w", err) + } + } + entries = append(entries, e) + } + return entries, rows.Err() +} + +// --------------------------------------------------------------------------- +// Runners +// --------------------------------------------------------------------------- + +// RegisterRunner creates a new runner record. +func (r *JobRepository) RegisterRunner(ctx context.Context, runner *Runner) error { + metaJSON, err := json.Marshal(runner.Metadata) + if err != nil { + return fmt.Errorf("marshaling runner metadata: %w", err) + } + + err = r.db.pool.QueryRow(ctx, ` + INSERT INTO runners (name, token_hash, token_prefix, tags, status, metadata) + VALUES ($1, $2, $3, $4, 'offline', $5) + RETURNING id, created_at, updated_at + `, runner.Name, runner.TokenHash, runner.TokenPrefix, runner.Tags, metaJSON, + ).Scan(&runner.ID, &runner.CreatedAt, &runner.UpdatedAt) + if err != nil { + return fmt.Errorf("registering runner: %w", err) + } + runner.Status = "offline" + return nil +} + +// GetRunnerByToken looks up a runner by token hash. +func (r *JobRepository) GetRunnerByToken(ctx context.Context, tokenHash string) (*Runner, error) { + runner := &Runner{} + var metaJSON []byte + err := r.db.pool.QueryRow(ctx, ` + SELECT id, name, token_hash, token_prefix, tags, status, + last_heartbeat, last_job_id, metadata, created_at, updated_at + FROM runners WHERE token_hash = $1 + `, tokenHash).Scan( + &runner.ID, &runner.Name, &runner.TokenHash, &runner.TokenPrefix, + &runner.Tags, &runner.Status, &runner.LastHeartbeat, &runner.LastJobID, + &metaJSON, &runner.CreatedAt, &runner.UpdatedAt, + ) + if err == pgx.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("querying runner by token: %w", err) + } + if metaJSON != nil { + if err := json.Unmarshal(metaJSON, &runner.Metadata); err != nil { + return nil, fmt.Errorf("unmarshaling runner metadata: %w", err) + } + } + return runner, nil +} + +// GetRunner returns a runner by ID. +func (r *JobRepository) GetRunner(ctx context.Context, runnerID string) (*Runner, error) { + runner := &Runner{} + var metaJSON []byte + err := r.db.pool.QueryRow(ctx, ` + SELECT id, name, token_hash, token_prefix, tags, status, + last_heartbeat, last_job_id, metadata, created_at, updated_at + FROM runners WHERE id = $1 + `, runnerID).Scan( + &runner.ID, &runner.Name, &runner.TokenHash, &runner.TokenPrefix, + &runner.Tags, &runner.Status, &runner.LastHeartbeat, &runner.LastJobID, + &metaJSON, &runner.CreatedAt, &runner.UpdatedAt, + ) + if err == pgx.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("querying runner: %w", err) + } + if metaJSON != nil { + if err := json.Unmarshal(metaJSON, &runner.Metadata); err != nil { + return nil, fmt.Errorf("unmarshaling runner metadata: %w", err) + } + } + return runner, nil +} + +// Heartbeat updates a runner's heartbeat timestamp and sets status to online. +func (r *JobRepository) Heartbeat(ctx context.Context, runnerID string) error { + res, err := r.db.pool.Exec(ctx, ` + UPDATE runners SET + status = 'online', + last_heartbeat = now(), + updated_at = now() + WHERE id = $1 + `, runnerID) + if err != nil { + return fmt.Errorf("updating heartbeat: %w", err) + } + if res.RowsAffected() == 0 { + return fmt.Errorf("runner %s not found", runnerID) + } + return nil +} + +// ListRunners returns all registered runners. +func (r *JobRepository) ListRunners(ctx context.Context) ([]*Runner, error) { + rows, err := r.db.pool.Query(ctx, ` + SELECT id, name, token_hash, token_prefix, tags, status, + last_heartbeat, last_job_id, metadata, created_at, updated_at + FROM runners ORDER BY name + `) + if err != nil { + return nil, fmt.Errorf("querying runners: %w", err) + } + defer rows.Close() + + var runners []*Runner + for rows.Next() { + runner := &Runner{} + var metaJSON []byte + if err := rows.Scan( + &runner.ID, &runner.Name, &runner.TokenHash, &runner.TokenPrefix, + &runner.Tags, &runner.Status, &runner.LastHeartbeat, &runner.LastJobID, + &metaJSON, &runner.CreatedAt, &runner.UpdatedAt, + ); err != nil { + return nil, fmt.Errorf("scanning runner: %w", err) + } + if metaJSON != nil { + if err := json.Unmarshal(metaJSON, &runner.Metadata); err != nil { + return nil, fmt.Errorf("unmarshaling runner metadata: %w", err) + } + } + runners = append(runners, runner) + } + return runners, rows.Err() +} + +// DeleteRunner removes a runner by ID. +func (r *JobRepository) DeleteRunner(ctx context.Context, runnerID string) error { + res, err := r.db.pool.Exec(ctx, `DELETE FROM runners WHERE id = $1`, runnerID) + if err != nil { + return fmt.Errorf("deleting runner: %w", err) + } + if res.RowsAffected() == 0 { + return fmt.Errorf("runner %s not found", runnerID) + } + return nil +} + +// ExpireStaleRunners marks runners with no recent heartbeat as offline. +func (r *JobRepository) ExpireStaleRunners(ctx context.Context, timeout time.Duration) (int64, error) { + result, err := r.db.pool.Exec(ctx, ` + UPDATE runners SET status = 'offline', updated_at = now() + WHERE status = 'online' + AND last_heartbeat < now() - $1::interval + `, timeout.String()) + if err != nil { + return 0, fmt.Errorf("expiring stale runners: %w", err) + } + return result.RowsAffected(), nil +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func scanJobDefinitions(rows pgx.Rows) ([]*JobDefinitionRecord, error) { + var defs []*JobDefinitionRecord + for rows.Next() { + d := &JobDefinitionRecord{} + var defJSON []byte + if err := rows.Scan( + &d.ID, &d.Name, &d.Version, &d.TriggerType, &d.ScopeType, &d.ComputeType, + &d.RunnerTags, &d.TimeoutSeconds, &d.MaxRetries, &d.Priority, + &defJSON, &d.Enabled, &d.CreatedAt, &d.UpdatedAt, + ); err != nil { + return nil, fmt.Errorf("scanning job definition: %w", err) + } + if defJSON != nil { + if err := json.Unmarshal(defJSON, &d.Definition); err != nil { + return nil, fmt.Errorf("unmarshaling definition: %w", err) + } + } + defs = append(defs, d) + } + return defs, rows.Err() +} + +func scanJobs(rows pgx.Rows) ([]*Job, error) { + var jobs []*Job + for rows.Next() { + j := &Job{} + var scopeJSON, resultJSON []byte + if err := rows.Scan( + &j.ID, &j.JobDefinitionID, &j.DefinitionName, &j.Status, &j.Priority, + &j.ItemID, &j.ProjectID, &scopeJSON, &j.RunnerID, &j.RunnerTags, + &j.CreatedAt, &j.ClaimedAt, &j.StartedAt, &j.CompletedAt, + &j.TimeoutSeconds, &j.ExpiresAt, &j.Progress, &j.ProgressMessage, + &resultJSON, &j.ErrorMessage, &j.RetryCount, &j.MaxRetries, + &j.CreatedBy, &j.CancelledBy, + ); err != nil { + return nil, fmt.Errorf("scanning job: %w", err) + } + if scopeJSON != nil { + if err := json.Unmarshal(scopeJSON, &j.ScopeMetadata); err != nil { + return nil, fmt.Errorf("unmarshaling scope metadata: %w", err) + } + } + if resultJSON != nil { + if err := json.Unmarshal(resultJSON, &j.Result); err != nil { + return nil, fmt.Errorf("unmarshaling result: %w", err) + } + } + jobs = append(jobs, j) + } + return jobs, rows.Err() +}