feat: add job and runner repository with atomic claim
This commit is contained in:
759
internal/db/jobs.go
Normal file
759
internal/db/jobs.go
Normal file
@@ -0,0 +1,759 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
// Runner represents a registered compute worker.
|
||||
type Runner struct {
|
||||
ID string
|
||||
Name string
|
||||
TokenHash string
|
||||
TokenPrefix string
|
||||
Tags []string
|
||||
Status string
|
||||
LastHeartbeat *time.Time
|
||||
LastJobID *string
|
||||
Metadata map[string]any
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
|
||||
// JobDefinitionRecord is a job definition stored in the database.
|
||||
type JobDefinitionRecord struct {
|
||||
ID string
|
||||
Name string
|
||||
Version int
|
||||
TriggerType string
|
||||
ScopeType string
|
||||
ComputeType string
|
||||
RunnerTags []string
|
||||
TimeoutSeconds int
|
||||
MaxRetries int
|
||||
Priority int
|
||||
Definition map[string]any
|
||||
Enabled bool
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
|
||||
// Job represents a single compute job instance.
|
||||
type Job struct {
|
||||
ID string
|
||||
JobDefinitionID *string
|
||||
DefinitionName string
|
||||
Status string
|
||||
Priority int
|
||||
ItemID *string
|
||||
ProjectID *string
|
||||
ScopeMetadata map[string]any
|
||||
RunnerID *string
|
||||
RunnerTags []string
|
||||
CreatedAt time.Time
|
||||
ClaimedAt *time.Time
|
||||
StartedAt *time.Time
|
||||
CompletedAt *time.Time
|
||||
TimeoutSeconds int
|
||||
ExpiresAt *time.Time
|
||||
Progress int
|
||||
ProgressMessage *string
|
||||
Result map[string]any
|
||||
ErrorMessage *string
|
||||
RetryCount int
|
||||
MaxRetries int
|
||||
CreatedBy *string
|
||||
CancelledBy *string
|
||||
}
|
||||
|
||||
// JobLogEntry is a single log line for a job.
|
||||
type JobLogEntry struct {
|
||||
ID string
|
||||
JobID string
|
||||
Timestamp time.Time
|
||||
Level string
|
||||
Message string
|
||||
Metadata map[string]any
|
||||
}
|
||||
|
||||
// JobRepository provides job and runner database operations.
|
||||
type JobRepository struct {
|
||||
db *DB
|
||||
}
|
||||
|
||||
// NewJobRepository creates a new job repository.
|
||||
func NewJobRepository(db *DB) *JobRepository {
|
||||
return &JobRepository{db: db}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Job Definitions
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// UpsertDefinition inserts or updates a job definition record.
|
||||
func (r *JobRepository) UpsertDefinition(ctx context.Context, d *JobDefinitionRecord) error {
|
||||
defJSON, err := json.Marshal(d.Definition)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshaling definition: %w", err)
|
||||
}
|
||||
|
||||
err = r.db.pool.QueryRow(ctx, `
|
||||
INSERT INTO job_definitions (name, version, trigger_type, scope_type, compute_type,
|
||||
runner_tags, timeout_seconds, max_retries, priority,
|
||||
definition, enabled)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
||||
ON CONFLICT (name) DO UPDATE SET
|
||||
version = EXCLUDED.version,
|
||||
trigger_type = EXCLUDED.trigger_type,
|
||||
scope_type = EXCLUDED.scope_type,
|
||||
compute_type = EXCLUDED.compute_type,
|
||||
runner_tags = EXCLUDED.runner_tags,
|
||||
timeout_seconds = EXCLUDED.timeout_seconds,
|
||||
max_retries = EXCLUDED.max_retries,
|
||||
priority = EXCLUDED.priority,
|
||||
definition = EXCLUDED.definition,
|
||||
enabled = EXCLUDED.enabled,
|
||||
updated_at = now()
|
||||
RETURNING id, created_at, updated_at
|
||||
`, d.Name, d.Version, d.TriggerType, d.ScopeType, d.ComputeType,
|
||||
d.RunnerTags, d.TimeoutSeconds, d.MaxRetries, d.Priority,
|
||||
defJSON, d.Enabled,
|
||||
).Scan(&d.ID, &d.CreatedAt, &d.UpdatedAt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("upserting job definition: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDefinition returns a job definition by name.
|
||||
func (r *JobRepository) GetDefinition(ctx context.Context, name string) (*JobDefinitionRecord, error) {
|
||||
d := &JobDefinitionRecord{}
|
||||
var defJSON []byte
|
||||
err := r.db.pool.QueryRow(ctx, `
|
||||
SELECT id, name, version, trigger_type, scope_type, compute_type,
|
||||
runner_tags, timeout_seconds, max_retries, priority,
|
||||
definition, enabled, created_at, updated_at
|
||||
FROM job_definitions WHERE name = $1
|
||||
`, name).Scan(
|
||||
&d.ID, &d.Name, &d.Version, &d.TriggerType, &d.ScopeType, &d.ComputeType,
|
||||
&d.RunnerTags, &d.TimeoutSeconds, &d.MaxRetries, &d.Priority,
|
||||
&defJSON, &d.Enabled, &d.CreatedAt, &d.UpdatedAt,
|
||||
)
|
||||
if err == pgx.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying job definition: %w", err)
|
||||
}
|
||||
if defJSON != nil {
|
||||
if err := json.Unmarshal(defJSON, &d.Definition); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling definition: %w", err)
|
||||
}
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// ListDefinitions returns all job definitions.
|
||||
func (r *JobRepository) ListDefinitions(ctx context.Context) ([]*JobDefinitionRecord, error) {
|
||||
rows, err := r.db.pool.Query(ctx, `
|
||||
SELECT id, name, version, trigger_type, scope_type, compute_type,
|
||||
runner_tags, timeout_seconds, max_retries, priority,
|
||||
definition, enabled, created_at, updated_at
|
||||
FROM job_definitions ORDER BY name
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying job definitions: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
return scanJobDefinitions(rows)
|
||||
}
|
||||
|
||||
// GetDefinitionsByTrigger returns all enabled definitions matching a trigger type.
|
||||
func (r *JobRepository) GetDefinitionsByTrigger(ctx context.Context, triggerType string) ([]*JobDefinitionRecord, error) {
|
||||
rows, err := r.db.pool.Query(ctx, `
|
||||
SELECT id, name, version, trigger_type, scope_type, compute_type,
|
||||
runner_tags, timeout_seconds, max_retries, priority,
|
||||
definition, enabled, created_at, updated_at
|
||||
FROM job_definitions
|
||||
WHERE trigger_type = $1 AND enabled = true
|
||||
ORDER BY priority ASC, name
|
||||
`, triggerType)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying definitions by trigger: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
return scanJobDefinitions(rows)
|
||||
}
|
||||
|
||||
// GetDefinitionByID returns a job definition by ID.
|
||||
func (r *JobRepository) GetDefinitionByID(ctx context.Context, id string) (*JobDefinitionRecord, error) {
|
||||
d := &JobDefinitionRecord{}
|
||||
var defJSON []byte
|
||||
err := r.db.pool.QueryRow(ctx, `
|
||||
SELECT id, name, version, trigger_type, scope_type, compute_type,
|
||||
runner_tags, timeout_seconds, max_retries, priority,
|
||||
definition, enabled, created_at, updated_at
|
||||
FROM job_definitions WHERE id = $1
|
||||
`, id).Scan(
|
||||
&d.ID, &d.Name, &d.Version, &d.TriggerType, &d.ScopeType, &d.ComputeType,
|
||||
&d.RunnerTags, &d.TimeoutSeconds, &d.MaxRetries, &d.Priority,
|
||||
&defJSON, &d.Enabled, &d.CreatedAt, &d.UpdatedAt,
|
||||
)
|
||||
if err == pgx.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying job definition by ID: %w", err)
|
||||
}
|
||||
if defJSON != nil {
|
||||
if err := json.Unmarshal(defJSON, &d.Definition); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling definition: %w", err)
|
||||
}
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Jobs
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// CreateJob inserts a new job.
|
||||
func (r *JobRepository) CreateJob(ctx context.Context, j *Job) error {
|
||||
scopeJSON, err := json.Marshal(j.ScopeMetadata)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshaling scope metadata: %w", err)
|
||||
}
|
||||
|
||||
err = r.db.pool.QueryRow(ctx, `
|
||||
INSERT INTO jobs (job_definition_id, definition_name, status, priority,
|
||||
item_id, project_id, scope_metadata,
|
||||
runner_tags, timeout_seconds, max_retries, created_by)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
||||
RETURNING id, created_at
|
||||
`, j.JobDefinitionID, j.DefinitionName, "pending", j.Priority,
|
||||
j.ItemID, j.ProjectID, scopeJSON,
|
||||
j.RunnerTags, j.TimeoutSeconds, j.MaxRetries, j.CreatedBy,
|
||||
).Scan(&j.ID, &j.CreatedAt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating job: %w", err)
|
||||
}
|
||||
j.Status = "pending"
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetJob returns a job by ID.
|
||||
func (r *JobRepository) GetJob(ctx context.Context, jobID string) (*Job, error) {
|
||||
j := &Job{}
|
||||
var scopeJSON, resultJSON []byte
|
||||
err := r.db.pool.QueryRow(ctx, `
|
||||
SELECT id, job_definition_id, definition_name, status, priority,
|
||||
item_id, project_id, scope_metadata, runner_id, runner_tags,
|
||||
created_at, claimed_at, started_at, completed_at,
|
||||
timeout_seconds, expires_at, progress, progress_message,
|
||||
result, error_message, retry_count, max_retries,
|
||||
created_by, cancelled_by
|
||||
FROM jobs WHERE id = $1
|
||||
`, jobID).Scan(
|
||||
&j.ID, &j.JobDefinitionID, &j.DefinitionName, &j.Status, &j.Priority,
|
||||
&j.ItemID, &j.ProjectID, &scopeJSON, &j.RunnerID, &j.RunnerTags,
|
||||
&j.CreatedAt, &j.ClaimedAt, &j.StartedAt, &j.CompletedAt,
|
||||
&j.TimeoutSeconds, &j.ExpiresAt, &j.Progress, &j.ProgressMessage,
|
||||
&resultJSON, &j.ErrorMessage, &j.RetryCount, &j.MaxRetries,
|
||||
&j.CreatedBy, &j.CancelledBy,
|
||||
)
|
||||
if err == pgx.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying job: %w", err)
|
||||
}
|
||||
if scopeJSON != nil {
|
||||
if err := json.Unmarshal(scopeJSON, &j.ScopeMetadata); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling scope metadata: %w", err)
|
||||
}
|
||||
}
|
||||
if resultJSON != nil {
|
||||
if err := json.Unmarshal(resultJSON, &j.Result); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling result: %w", err)
|
||||
}
|
||||
}
|
||||
return j, nil
|
||||
}
|
||||
|
||||
// ListJobs returns jobs matching optional filters.
|
||||
func (r *JobRepository) ListJobs(ctx context.Context, status, itemID string, limit, offset int) ([]*Job, error) {
|
||||
query := `
|
||||
SELECT id, job_definition_id, definition_name, status, priority,
|
||||
item_id, project_id, scope_metadata, runner_id, runner_tags,
|
||||
created_at, claimed_at, started_at, completed_at,
|
||||
timeout_seconds, expires_at, progress, progress_message,
|
||||
result, error_message, retry_count, max_retries,
|
||||
created_by, cancelled_by
|
||||
FROM jobs WHERE 1=1`
|
||||
args := []any{}
|
||||
argN := 1
|
||||
|
||||
if status != "" {
|
||||
query += fmt.Sprintf(" AND status = $%d", argN)
|
||||
args = append(args, status)
|
||||
argN++
|
||||
}
|
||||
if itemID != "" {
|
||||
query += fmt.Sprintf(" AND item_id = $%d", argN)
|
||||
args = append(args, itemID)
|
||||
argN++
|
||||
}
|
||||
|
||||
query += " ORDER BY created_at DESC"
|
||||
|
||||
if limit > 0 {
|
||||
query += fmt.Sprintf(" LIMIT $%d", argN)
|
||||
args = append(args, limit)
|
||||
argN++
|
||||
}
|
||||
if offset > 0 {
|
||||
query += fmt.Sprintf(" OFFSET $%d", argN)
|
||||
args = append(args, offset)
|
||||
}
|
||||
|
||||
rows, err := r.db.pool.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying jobs: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
return scanJobs(rows)
|
||||
}
|
||||
|
||||
// ClaimJob atomically claims the next available job matching the runner's tags.
|
||||
// Uses SELECT FOR UPDATE SKIP LOCKED for exactly-once delivery.
|
||||
func (r *JobRepository) ClaimJob(ctx context.Context, runnerID string, tags []string) (*Job, error) {
|
||||
j := &Job{}
|
||||
var scopeJSON, resultJSON []byte
|
||||
err := r.db.pool.QueryRow(ctx, `
|
||||
WITH claimable AS (
|
||||
SELECT id FROM jobs
|
||||
WHERE status = 'pending' AND runner_tags <@ $2::text[]
|
||||
ORDER BY priority ASC, created_at ASC
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
UPDATE jobs SET
|
||||
status = 'claimed',
|
||||
runner_id = $1,
|
||||
claimed_at = now(),
|
||||
expires_at = now() + (timeout_seconds || ' seconds')::interval
|
||||
FROM claimable
|
||||
WHERE jobs.id = claimable.id
|
||||
RETURNING jobs.id, jobs.job_definition_id, jobs.definition_name, jobs.status,
|
||||
jobs.priority, jobs.item_id, jobs.project_id, jobs.scope_metadata,
|
||||
jobs.runner_id, jobs.runner_tags, jobs.created_at, jobs.claimed_at,
|
||||
jobs.started_at, jobs.completed_at, jobs.timeout_seconds, jobs.expires_at,
|
||||
jobs.progress, jobs.progress_message, jobs.result, jobs.error_message,
|
||||
jobs.retry_count, jobs.max_retries, jobs.created_by, jobs.cancelled_by
|
||||
`, runnerID, tags).Scan(
|
||||
&j.ID, &j.JobDefinitionID, &j.DefinitionName, &j.Status,
|
||||
&j.Priority, &j.ItemID, &j.ProjectID, &scopeJSON,
|
||||
&j.RunnerID, &j.RunnerTags, &j.CreatedAt, &j.ClaimedAt,
|
||||
&j.StartedAt, &j.CompletedAt, &j.TimeoutSeconds, &j.ExpiresAt,
|
||||
&j.Progress, &j.ProgressMessage, &resultJSON, &j.ErrorMessage,
|
||||
&j.RetryCount, &j.MaxRetries, &j.CreatedBy, &j.CancelledBy,
|
||||
)
|
||||
if err == pgx.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("claiming job: %w", err)
|
||||
}
|
||||
if scopeJSON != nil {
|
||||
if err := json.Unmarshal(scopeJSON, &j.ScopeMetadata); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling scope metadata: %w", err)
|
||||
}
|
||||
}
|
||||
if resultJSON != nil {
|
||||
if err := json.Unmarshal(resultJSON, &j.Result); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling result: %w", err)
|
||||
}
|
||||
}
|
||||
return j, nil
|
||||
}
|
||||
|
||||
// StartJob transitions a claimed job to running.
|
||||
func (r *JobRepository) StartJob(ctx context.Context, jobID, runnerID string) error {
|
||||
result, err := r.db.pool.Exec(ctx, `
|
||||
UPDATE jobs SET status = 'running', started_at = now()
|
||||
WHERE id = $1 AND runner_id = $2 AND status = 'claimed'
|
||||
`, jobID, runnerID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("starting job: %w", err)
|
||||
}
|
||||
if result.RowsAffected() == 0 {
|
||||
return fmt.Errorf("job %s not claimable by runner %s or not in claimed state", jobID, runnerID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateProgress updates a running job's progress.
|
||||
func (r *JobRepository) UpdateProgress(ctx context.Context, jobID, runnerID string, progress int, message string) error {
|
||||
var msg *string
|
||||
if message != "" {
|
||||
msg = &message
|
||||
}
|
||||
result, err := r.db.pool.Exec(ctx, `
|
||||
UPDATE jobs SET progress = $3, progress_message = $4
|
||||
WHERE id = $1 AND runner_id = $2 AND status IN ('claimed', 'running')
|
||||
`, jobID, runnerID, progress, msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("updating progress: %w", err)
|
||||
}
|
||||
if result.RowsAffected() == 0 {
|
||||
return fmt.Errorf("job %s not owned by runner %s or not active", jobID, runnerID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CompleteJob marks a job as completed with optional result data.
|
||||
func (r *JobRepository) CompleteJob(ctx context.Context, jobID, runnerID string, resultData map[string]any) error {
|
||||
var resultJSON []byte
|
||||
var err error
|
||||
if resultData != nil {
|
||||
resultJSON, err = json.Marshal(resultData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshaling result: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
res, err := r.db.pool.Exec(ctx, `
|
||||
UPDATE jobs SET
|
||||
status = 'completed',
|
||||
progress = 100,
|
||||
result = $3,
|
||||
completed_at = now()
|
||||
WHERE id = $1 AND runner_id = $2 AND status IN ('claimed', 'running')
|
||||
`, jobID, runnerID, resultJSON)
|
||||
if err != nil {
|
||||
return fmt.Errorf("completing job: %w", err)
|
||||
}
|
||||
if res.RowsAffected() == 0 {
|
||||
return fmt.Errorf("job %s not owned by runner %s or not active", jobID, runnerID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// FailJob marks a job as failed with an error message.
|
||||
func (r *JobRepository) FailJob(ctx context.Context, jobID, runnerID string, errMsg string) error {
|
||||
res, err := r.db.pool.Exec(ctx, `
|
||||
UPDATE jobs SET
|
||||
status = 'failed',
|
||||
error_message = $3,
|
||||
completed_at = now()
|
||||
WHERE id = $1 AND runner_id = $2 AND status IN ('claimed', 'running')
|
||||
`, jobID, runnerID, errMsg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failing job: %w", err)
|
||||
}
|
||||
if res.RowsAffected() == 0 {
|
||||
return fmt.Errorf("job %s not owned by runner %s or not active", jobID, runnerID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CancelJob cancels a pending or active job.
|
||||
func (r *JobRepository) CancelJob(ctx context.Context, jobID string, cancelledBy string) error {
|
||||
res, err := r.db.pool.Exec(ctx, `
|
||||
UPDATE jobs SET
|
||||
status = 'cancelled',
|
||||
cancelled_by = $2,
|
||||
completed_at = now()
|
||||
WHERE id = $1 AND status IN ('pending', 'claimed', 'running')
|
||||
`, jobID, cancelledBy)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cancelling job: %w", err)
|
||||
}
|
||||
if res.RowsAffected() == 0 {
|
||||
return fmt.Errorf("job %s not cancellable", jobID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TimeoutExpiredJobs marks expired claimed/running jobs as failed.
|
||||
// Returns the number of jobs timed out.
|
||||
func (r *JobRepository) TimeoutExpiredJobs(ctx context.Context) (int64, error) {
|
||||
result, err := r.db.pool.Exec(ctx, `
|
||||
UPDATE jobs SET
|
||||
status = 'failed',
|
||||
error_message = 'job timed out',
|
||||
completed_at = now()
|
||||
WHERE status IN ('claimed', 'running')
|
||||
AND expires_at IS NOT NULL
|
||||
AND expires_at < now()
|
||||
`)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("timing out expired jobs: %w", err)
|
||||
}
|
||||
return result.RowsAffected(), nil
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Job Log
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// AppendLog adds a log entry to a job.
|
||||
func (r *JobRepository) AppendLog(ctx context.Context, entry *JobLogEntry) error {
|
||||
metaJSON, err := json.Marshal(entry.Metadata)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshaling log metadata: %w", err)
|
||||
}
|
||||
|
||||
err = r.db.pool.QueryRow(ctx, `
|
||||
INSERT INTO job_log (job_id, level, message, metadata)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
RETURNING id, timestamp
|
||||
`, entry.JobID, entry.Level, entry.Message, metaJSON,
|
||||
).Scan(&entry.ID, &entry.Timestamp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("appending job log: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetJobLogs returns all log entries for a job.
|
||||
func (r *JobRepository) GetJobLogs(ctx context.Context, jobID string) ([]*JobLogEntry, error) {
|
||||
rows, err := r.db.pool.Query(ctx, `
|
||||
SELECT id, job_id, timestamp, level, message, metadata
|
||||
FROM job_log WHERE job_id = $1 ORDER BY timestamp ASC
|
||||
`, jobID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying job logs: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var entries []*JobLogEntry
|
||||
for rows.Next() {
|
||||
e := &JobLogEntry{}
|
||||
var metaJSON []byte
|
||||
if err := rows.Scan(&e.ID, &e.JobID, &e.Timestamp, &e.Level, &e.Message, &metaJSON); err != nil {
|
||||
return nil, fmt.Errorf("scanning job log: %w", err)
|
||||
}
|
||||
if metaJSON != nil {
|
||||
if err := json.Unmarshal(metaJSON, &e.Metadata); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling log metadata: %w", err)
|
||||
}
|
||||
}
|
||||
entries = append(entries, e)
|
||||
}
|
||||
return entries, rows.Err()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Runners
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// RegisterRunner creates a new runner record.
|
||||
func (r *JobRepository) RegisterRunner(ctx context.Context, runner *Runner) error {
|
||||
metaJSON, err := json.Marshal(runner.Metadata)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshaling runner metadata: %w", err)
|
||||
}
|
||||
|
||||
err = r.db.pool.QueryRow(ctx, `
|
||||
INSERT INTO runners (name, token_hash, token_prefix, tags, status, metadata)
|
||||
VALUES ($1, $2, $3, $4, 'offline', $5)
|
||||
RETURNING id, created_at, updated_at
|
||||
`, runner.Name, runner.TokenHash, runner.TokenPrefix, runner.Tags, metaJSON,
|
||||
).Scan(&runner.ID, &runner.CreatedAt, &runner.UpdatedAt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("registering runner: %w", err)
|
||||
}
|
||||
runner.Status = "offline"
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetRunnerByToken looks up a runner by token hash.
|
||||
func (r *JobRepository) GetRunnerByToken(ctx context.Context, tokenHash string) (*Runner, error) {
|
||||
runner := &Runner{}
|
||||
var metaJSON []byte
|
||||
err := r.db.pool.QueryRow(ctx, `
|
||||
SELECT id, name, token_hash, token_prefix, tags, status,
|
||||
last_heartbeat, last_job_id, metadata, created_at, updated_at
|
||||
FROM runners WHERE token_hash = $1
|
||||
`, tokenHash).Scan(
|
||||
&runner.ID, &runner.Name, &runner.TokenHash, &runner.TokenPrefix,
|
||||
&runner.Tags, &runner.Status, &runner.LastHeartbeat, &runner.LastJobID,
|
||||
&metaJSON, &runner.CreatedAt, &runner.UpdatedAt,
|
||||
)
|
||||
if err == pgx.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying runner by token: %w", err)
|
||||
}
|
||||
if metaJSON != nil {
|
||||
if err := json.Unmarshal(metaJSON, &runner.Metadata); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling runner metadata: %w", err)
|
||||
}
|
||||
}
|
||||
return runner, nil
|
||||
}
|
||||
|
||||
// GetRunner returns a runner by ID.
|
||||
func (r *JobRepository) GetRunner(ctx context.Context, runnerID string) (*Runner, error) {
|
||||
runner := &Runner{}
|
||||
var metaJSON []byte
|
||||
err := r.db.pool.QueryRow(ctx, `
|
||||
SELECT id, name, token_hash, token_prefix, tags, status,
|
||||
last_heartbeat, last_job_id, metadata, created_at, updated_at
|
||||
FROM runners WHERE id = $1
|
||||
`, runnerID).Scan(
|
||||
&runner.ID, &runner.Name, &runner.TokenHash, &runner.TokenPrefix,
|
||||
&runner.Tags, &runner.Status, &runner.LastHeartbeat, &runner.LastJobID,
|
||||
&metaJSON, &runner.CreatedAt, &runner.UpdatedAt,
|
||||
)
|
||||
if err == pgx.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying runner: %w", err)
|
||||
}
|
||||
if metaJSON != nil {
|
||||
if err := json.Unmarshal(metaJSON, &runner.Metadata); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling runner metadata: %w", err)
|
||||
}
|
||||
}
|
||||
return runner, nil
|
||||
}
|
||||
|
||||
// Heartbeat updates a runner's heartbeat timestamp and sets status to online.
|
||||
func (r *JobRepository) Heartbeat(ctx context.Context, runnerID string) error {
|
||||
res, err := r.db.pool.Exec(ctx, `
|
||||
UPDATE runners SET
|
||||
status = 'online',
|
||||
last_heartbeat = now(),
|
||||
updated_at = now()
|
||||
WHERE id = $1
|
||||
`, runnerID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("updating heartbeat: %w", err)
|
||||
}
|
||||
if res.RowsAffected() == 0 {
|
||||
return fmt.Errorf("runner %s not found", runnerID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListRunners returns all registered runners.
|
||||
func (r *JobRepository) ListRunners(ctx context.Context) ([]*Runner, error) {
|
||||
rows, err := r.db.pool.Query(ctx, `
|
||||
SELECT id, name, token_hash, token_prefix, tags, status,
|
||||
last_heartbeat, last_job_id, metadata, created_at, updated_at
|
||||
FROM runners ORDER BY name
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying runners: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var runners []*Runner
|
||||
for rows.Next() {
|
||||
runner := &Runner{}
|
||||
var metaJSON []byte
|
||||
if err := rows.Scan(
|
||||
&runner.ID, &runner.Name, &runner.TokenHash, &runner.TokenPrefix,
|
||||
&runner.Tags, &runner.Status, &runner.LastHeartbeat, &runner.LastJobID,
|
||||
&metaJSON, &runner.CreatedAt, &runner.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("scanning runner: %w", err)
|
||||
}
|
||||
if metaJSON != nil {
|
||||
if err := json.Unmarshal(metaJSON, &runner.Metadata); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling runner metadata: %w", err)
|
||||
}
|
||||
}
|
||||
runners = append(runners, runner)
|
||||
}
|
||||
return runners, rows.Err()
|
||||
}
|
||||
|
||||
// DeleteRunner removes a runner by ID.
|
||||
func (r *JobRepository) DeleteRunner(ctx context.Context, runnerID string) error {
|
||||
res, err := r.db.pool.Exec(ctx, `DELETE FROM runners WHERE id = $1`, runnerID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleting runner: %w", err)
|
||||
}
|
||||
if res.RowsAffected() == 0 {
|
||||
return fmt.Errorf("runner %s not found", runnerID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExpireStaleRunners marks runners with no recent heartbeat as offline.
|
||||
func (r *JobRepository) ExpireStaleRunners(ctx context.Context, timeout time.Duration) (int64, error) {
|
||||
result, err := r.db.pool.Exec(ctx, `
|
||||
UPDATE runners SET status = 'offline', updated_at = now()
|
||||
WHERE status = 'online'
|
||||
AND last_heartbeat < now() - $1::interval
|
||||
`, timeout.String())
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("expiring stale runners: %w", err)
|
||||
}
|
||||
return result.RowsAffected(), nil
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func scanJobDefinitions(rows pgx.Rows) ([]*JobDefinitionRecord, error) {
|
||||
var defs []*JobDefinitionRecord
|
||||
for rows.Next() {
|
||||
d := &JobDefinitionRecord{}
|
||||
var defJSON []byte
|
||||
if err := rows.Scan(
|
||||
&d.ID, &d.Name, &d.Version, &d.TriggerType, &d.ScopeType, &d.ComputeType,
|
||||
&d.RunnerTags, &d.TimeoutSeconds, &d.MaxRetries, &d.Priority,
|
||||
&defJSON, &d.Enabled, &d.CreatedAt, &d.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("scanning job definition: %w", err)
|
||||
}
|
||||
if defJSON != nil {
|
||||
if err := json.Unmarshal(defJSON, &d.Definition); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling definition: %w", err)
|
||||
}
|
||||
}
|
||||
defs = append(defs, d)
|
||||
}
|
||||
return defs, rows.Err()
|
||||
}
|
||||
|
||||
func scanJobs(rows pgx.Rows) ([]*Job, error) {
|
||||
var jobs []*Job
|
||||
for rows.Next() {
|
||||
j := &Job{}
|
||||
var scopeJSON, resultJSON []byte
|
||||
if err := rows.Scan(
|
||||
&j.ID, &j.JobDefinitionID, &j.DefinitionName, &j.Status, &j.Priority,
|
||||
&j.ItemID, &j.ProjectID, &scopeJSON, &j.RunnerID, &j.RunnerTags,
|
||||
&j.CreatedAt, &j.ClaimedAt, &j.StartedAt, &j.CompletedAt,
|
||||
&j.TimeoutSeconds, &j.ExpiresAt, &j.Progress, &j.ProgressMessage,
|
||||
&resultJSON, &j.ErrorMessage, &j.RetryCount, &j.MaxRetries,
|
||||
&j.CreatedBy, &j.CancelledBy,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("scanning job: %w", err)
|
||||
}
|
||||
if scopeJSON != nil {
|
||||
if err := json.Unmarshal(scopeJSON, &j.ScopeMetadata); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling scope metadata: %w", err)
|
||||
}
|
||||
}
|
||||
if resultJSON != nil {
|
||||
if err := json.Unmarshal(resultJSON, &j.Result); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling result: %w", err)
|
||||
}
|
||||
}
|
||||
jobs = append(jobs, j)
|
||||
}
|
||||
return jobs, rows.Err()
|
||||
}
|
||||
Reference in New Issue
Block a user