package db import ( "context" "encoding/json" "fmt" "time" "github.com/jackc/pgx/v5" ) // Runner represents a registered compute worker. type Runner struct { ID string Name string TokenHash string TokenPrefix string Tags []string Status string LastHeartbeat *time.Time LastJobID *string Metadata map[string]any CreatedAt time.Time UpdatedAt time.Time } // JobDefinitionRecord is a job definition stored in the database. type JobDefinitionRecord struct { ID string Name string Version int TriggerType string ScopeType string ComputeType string RunnerTags []string TimeoutSeconds int MaxRetries int Priority int Definition map[string]any Enabled bool CreatedAt time.Time UpdatedAt time.Time } // Job represents a single compute job instance. type Job struct { ID string JobDefinitionID *string DefinitionName string Status string Priority int ItemID *string ProjectID *string ScopeMetadata map[string]any RunnerID *string RunnerTags []string CreatedAt time.Time ClaimedAt *time.Time StartedAt *time.Time CompletedAt *time.Time TimeoutSeconds int ExpiresAt *time.Time Progress int ProgressMessage *string Result map[string]any ErrorMessage *string RetryCount int MaxRetries int CreatedBy *string CancelledBy *string } // JobLogEntry is a single log line for a job. type JobLogEntry struct { ID string JobID string Timestamp time.Time Level string Message string Metadata map[string]any } // JobRepository provides job and runner database operations. type JobRepository struct { db *DB } // NewJobRepository creates a new job repository. func NewJobRepository(db *DB) *JobRepository { return &JobRepository{db: db} } // --------------------------------------------------------------------------- // Job Definitions // --------------------------------------------------------------------------- // UpsertDefinition inserts or updates a job definition record. func (r *JobRepository) UpsertDefinition(ctx context.Context, d *JobDefinitionRecord) error { defJSON, err := json.Marshal(d.Definition) if err != nil { return fmt.Errorf("marshaling definition: %w", err) } err = r.db.pool.QueryRow(ctx, ` INSERT INTO job_definitions (name, version, trigger_type, scope_type, compute_type, runner_tags, timeout_seconds, max_retries, priority, definition, enabled) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (name) DO UPDATE SET version = EXCLUDED.version, trigger_type = EXCLUDED.trigger_type, scope_type = EXCLUDED.scope_type, compute_type = EXCLUDED.compute_type, runner_tags = EXCLUDED.runner_tags, timeout_seconds = EXCLUDED.timeout_seconds, max_retries = EXCLUDED.max_retries, priority = EXCLUDED.priority, definition = EXCLUDED.definition, enabled = EXCLUDED.enabled, updated_at = now() RETURNING id, created_at, updated_at `, d.Name, d.Version, d.TriggerType, d.ScopeType, d.ComputeType, d.RunnerTags, d.TimeoutSeconds, d.MaxRetries, d.Priority, defJSON, d.Enabled, ).Scan(&d.ID, &d.CreatedAt, &d.UpdatedAt) if err != nil { return fmt.Errorf("upserting job definition: %w", err) } return nil } // GetDefinition returns a job definition by name. func (r *JobRepository) GetDefinition(ctx context.Context, name string) (*JobDefinitionRecord, error) { d := &JobDefinitionRecord{} var defJSON []byte err := r.db.pool.QueryRow(ctx, ` SELECT id, name, version, trigger_type, scope_type, compute_type, runner_tags, timeout_seconds, max_retries, priority, definition, enabled, created_at, updated_at FROM job_definitions WHERE name = $1 `, name).Scan( &d.ID, &d.Name, &d.Version, &d.TriggerType, &d.ScopeType, &d.ComputeType, &d.RunnerTags, &d.TimeoutSeconds, &d.MaxRetries, &d.Priority, &defJSON, &d.Enabled, &d.CreatedAt, &d.UpdatedAt, ) if err == pgx.ErrNoRows { return nil, nil } if err != nil { return nil, fmt.Errorf("querying job definition: %w", err) } if defJSON != nil { if err := json.Unmarshal(defJSON, &d.Definition); err != nil { return nil, fmt.Errorf("unmarshaling definition: %w", err) } } return d, nil } // ListDefinitions returns all job definitions. func (r *JobRepository) ListDefinitions(ctx context.Context) ([]*JobDefinitionRecord, error) { rows, err := r.db.pool.Query(ctx, ` SELECT id, name, version, trigger_type, scope_type, compute_type, runner_tags, timeout_seconds, max_retries, priority, definition, enabled, created_at, updated_at FROM job_definitions ORDER BY name `) if err != nil { return nil, fmt.Errorf("querying job definitions: %w", err) } defer rows.Close() return scanJobDefinitions(rows) } // GetDefinitionsByTrigger returns all enabled definitions matching a trigger type. func (r *JobRepository) GetDefinitionsByTrigger(ctx context.Context, triggerType string) ([]*JobDefinitionRecord, error) { rows, err := r.db.pool.Query(ctx, ` SELECT id, name, version, trigger_type, scope_type, compute_type, runner_tags, timeout_seconds, max_retries, priority, definition, enabled, created_at, updated_at FROM job_definitions WHERE trigger_type = $1 AND enabled = true ORDER BY priority ASC, name `, triggerType) if err != nil { return nil, fmt.Errorf("querying definitions by trigger: %w", err) } defer rows.Close() return scanJobDefinitions(rows) } // GetDefinitionByID returns a job definition by ID. func (r *JobRepository) GetDefinitionByID(ctx context.Context, id string) (*JobDefinitionRecord, error) { d := &JobDefinitionRecord{} var defJSON []byte err := r.db.pool.QueryRow(ctx, ` SELECT id, name, version, trigger_type, scope_type, compute_type, runner_tags, timeout_seconds, max_retries, priority, definition, enabled, created_at, updated_at FROM job_definitions WHERE id = $1 `, id).Scan( &d.ID, &d.Name, &d.Version, &d.TriggerType, &d.ScopeType, &d.ComputeType, &d.RunnerTags, &d.TimeoutSeconds, &d.MaxRetries, &d.Priority, &defJSON, &d.Enabled, &d.CreatedAt, &d.UpdatedAt, ) if err == pgx.ErrNoRows { return nil, nil } if err != nil { return nil, fmt.Errorf("querying job definition by ID: %w", err) } if defJSON != nil { if err := json.Unmarshal(defJSON, &d.Definition); err != nil { return nil, fmt.Errorf("unmarshaling definition: %w", err) } } return d, nil } // --------------------------------------------------------------------------- // Jobs // --------------------------------------------------------------------------- // CreateJob inserts a new job. func (r *JobRepository) CreateJob(ctx context.Context, j *Job) error { scopeJSON, err := json.Marshal(j.ScopeMetadata) if err != nil { return fmt.Errorf("marshaling scope metadata: %w", err) } err = r.db.pool.QueryRow(ctx, ` INSERT INTO jobs (job_definition_id, definition_name, status, priority, item_id, project_id, scope_metadata, runner_tags, timeout_seconds, max_retries, created_by) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id, created_at `, j.JobDefinitionID, j.DefinitionName, "pending", j.Priority, j.ItemID, j.ProjectID, scopeJSON, j.RunnerTags, j.TimeoutSeconds, j.MaxRetries, j.CreatedBy, ).Scan(&j.ID, &j.CreatedAt) if err != nil { return fmt.Errorf("creating job: %w", err) } j.Status = "pending" return nil } // GetJob returns a job by ID. func (r *JobRepository) GetJob(ctx context.Context, jobID string) (*Job, error) { j := &Job{} var scopeJSON, resultJSON []byte err := r.db.pool.QueryRow(ctx, ` SELECT id, job_definition_id, definition_name, status, priority, item_id, project_id, scope_metadata, runner_id, runner_tags, created_at, claimed_at, started_at, completed_at, timeout_seconds, expires_at, progress, progress_message, result, error_message, retry_count, max_retries, created_by, cancelled_by FROM jobs WHERE id = $1 `, jobID).Scan( &j.ID, &j.JobDefinitionID, &j.DefinitionName, &j.Status, &j.Priority, &j.ItemID, &j.ProjectID, &scopeJSON, &j.RunnerID, &j.RunnerTags, &j.CreatedAt, &j.ClaimedAt, &j.StartedAt, &j.CompletedAt, &j.TimeoutSeconds, &j.ExpiresAt, &j.Progress, &j.ProgressMessage, &resultJSON, &j.ErrorMessage, &j.RetryCount, &j.MaxRetries, &j.CreatedBy, &j.CancelledBy, ) if err == pgx.ErrNoRows { return nil, nil } if err != nil { return nil, fmt.Errorf("querying job: %w", err) } if scopeJSON != nil { if err := json.Unmarshal(scopeJSON, &j.ScopeMetadata); err != nil { return nil, fmt.Errorf("unmarshaling scope metadata: %w", err) } } if resultJSON != nil { if err := json.Unmarshal(resultJSON, &j.Result); err != nil { return nil, fmt.Errorf("unmarshaling result: %w", err) } } return j, nil } // ListJobs returns jobs matching optional filters. func (r *JobRepository) ListJobs(ctx context.Context, status, itemID string, limit, offset int) ([]*Job, error) { query := ` SELECT id, job_definition_id, definition_name, status, priority, item_id, project_id, scope_metadata, runner_id, runner_tags, created_at, claimed_at, started_at, completed_at, timeout_seconds, expires_at, progress, progress_message, result, error_message, retry_count, max_retries, created_by, cancelled_by FROM jobs WHERE 1=1` args := []any{} argN := 1 if status != "" { query += fmt.Sprintf(" AND status = $%d", argN) args = append(args, status) argN++ } if itemID != "" { query += fmt.Sprintf(" AND item_id = $%d", argN) args = append(args, itemID) argN++ } query += " ORDER BY created_at DESC" if limit > 0 { query += fmt.Sprintf(" LIMIT $%d", argN) args = append(args, limit) argN++ } if offset > 0 { query += fmt.Sprintf(" OFFSET $%d", argN) args = append(args, offset) } rows, err := r.db.pool.Query(ctx, query, args...) if err != nil { return nil, fmt.Errorf("querying jobs: %w", err) } defer rows.Close() return scanJobs(rows) } // ListSolverJobs returns solver jobs (definition_name LIKE 'assembly-%') with optional filters. func (r *JobRepository) ListSolverJobs(ctx context.Context, status, itemID, operation string, limit, offset int) ([]*Job, error) { query := ` SELECT id, job_definition_id, definition_name, status, priority, item_id, project_id, scope_metadata, runner_id, runner_tags, created_at, claimed_at, started_at, completed_at, timeout_seconds, expires_at, progress, progress_message, result, error_message, retry_count, max_retries, created_by, cancelled_by FROM jobs WHERE definition_name LIKE 'assembly-%'` args := []any{} argN := 1 if status != "" { query += fmt.Sprintf(" AND status = $%d", argN) args = append(args, status) argN++ } if itemID != "" { query += fmt.Sprintf(" AND item_id = $%d", argN) args = append(args, itemID) argN++ } if operation != "" { query += fmt.Sprintf(" AND scope_metadata->>'operation' = $%d", argN) args = append(args, operation) argN++ } query += " ORDER BY created_at DESC" if limit > 0 { query += fmt.Sprintf(" LIMIT $%d", argN) args = append(args, limit) argN++ } if offset > 0 { query += fmt.Sprintf(" OFFSET $%d", argN) args = append(args, offset) } rows, err := r.db.pool.Query(ctx, query, args...) if err != nil { return nil, fmt.Errorf("querying solver jobs: %w", err) } defer rows.Close() return scanJobs(rows) } // ClaimJob atomically claims the next available job matching the runner's tags. // Uses SELECT FOR UPDATE SKIP LOCKED for exactly-once delivery. func (r *JobRepository) ClaimJob(ctx context.Context, runnerID string, tags []string) (*Job, error) { j := &Job{} var scopeJSON, resultJSON []byte err := r.db.pool.QueryRow(ctx, ` WITH claimable AS ( SELECT id FROM jobs WHERE status = 'pending' AND runner_tags <@ $2::text[] ORDER BY priority ASC, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) UPDATE jobs SET status = 'claimed', runner_id = $1, claimed_at = now(), expires_at = now() + (timeout_seconds || ' seconds')::interval FROM claimable WHERE jobs.id = claimable.id RETURNING jobs.id, jobs.job_definition_id, jobs.definition_name, jobs.status, jobs.priority, jobs.item_id, jobs.project_id, jobs.scope_metadata, jobs.runner_id, jobs.runner_tags, jobs.created_at, jobs.claimed_at, jobs.started_at, jobs.completed_at, jobs.timeout_seconds, jobs.expires_at, jobs.progress, jobs.progress_message, jobs.result, jobs.error_message, jobs.retry_count, jobs.max_retries, jobs.created_by, jobs.cancelled_by `, runnerID, tags).Scan( &j.ID, &j.JobDefinitionID, &j.DefinitionName, &j.Status, &j.Priority, &j.ItemID, &j.ProjectID, &scopeJSON, &j.RunnerID, &j.RunnerTags, &j.CreatedAt, &j.ClaimedAt, &j.StartedAt, &j.CompletedAt, &j.TimeoutSeconds, &j.ExpiresAt, &j.Progress, &j.ProgressMessage, &resultJSON, &j.ErrorMessage, &j.RetryCount, &j.MaxRetries, &j.CreatedBy, &j.CancelledBy, ) if err == pgx.ErrNoRows { return nil, nil } if err != nil { return nil, fmt.Errorf("claiming job: %w", err) } if scopeJSON != nil { if err := json.Unmarshal(scopeJSON, &j.ScopeMetadata); err != nil { return nil, fmt.Errorf("unmarshaling scope metadata: %w", err) } } if resultJSON != nil { if err := json.Unmarshal(resultJSON, &j.Result); err != nil { return nil, fmt.Errorf("unmarshaling result: %w", err) } } return j, nil } // StartJob transitions a claimed job to running. func (r *JobRepository) StartJob(ctx context.Context, jobID, runnerID string) error { result, err := r.db.pool.Exec(ctx, ` UPDATE jobs SET status = 'running', started_at = now() WHERE id = $1 AND runner_id = $2 AND status = 'claimed' `, jobID, runnerID) if err != nil { return fmt.Errorf("starting job: %w", err) } if result.RowsAffected() == 0 { return fmt.Errorf("job %s not claimable by runner %s or not in claimed state", jobID, runnerID) } return nil } // UpdateProgress updates a running job's progress. func (r *JobRepository) UpdateProgress(ctx context.Context, jobID, runnerID string, progress int, message string) error { var msg *string if message != "" { msg = &message } result, err := r.db.pool.Exec(ctx, ` UPDATE jobs SET progress = $3, progress_message = $4 WHERE id = $1 AND runner_id = $2 AND status IN ('claimed', 'running') `, jobID, runnerID, progress, msg) if err != nil { return fmt.Errorf("updating progress: %w", err) } if result.RowsAffected() == 0 { return fmt.Errorf("job %s not owned by runner %s or not active", jobID, runnerID) } return nil } // CompleteJob marks a job as completed with optional result data. func (r *JobRepository) CompleteJob(ctx context.Context, jobID, runnerID string, resultData map[string]any) error { var resultJSON []byte var err error if resultData != nil { resultJSON, err = json.Marshal(resultData) if err != nil { return fmt.Errorf("marshaling result: %w", err) } } res, err := r.db.pool.Exec(ctx, ` UPDATE jobs SET status = 'completed', progress = 100, result = $3, completed_at = now() WHERE id = $1 AND runner_id = $2 AND status IN ('claimed', 'running') `, jobID, runnerID, resultJSON) if err != nil { return fmt.Errorf("completing job: %w", err) } if res.RowsAffected() == 0 { return fmt.Errorf("job %s not owned by runner %s or not active", jobID, runnerID) } return nil } // FailJob marks a job as failed with an error message. func (r *JobRepository) FailJob(ctx context.Context, jobID, runnerID string, errMsg string) error { res, err := r.db.pool.Exec(ctx, ` UPDATE jobs SET status = 'failed', error_message = $3, completed_at = now() WHERE id = $1 AND runner_id = $2 AND status IN ('claimed', 'running') `, jobID, runnerID, errMsg) if err != nil { return fmt.Errorf("failing job: %w", err) } if res.RowsAffected() == 0 { return fmt.Errorf("job %s not owned by runner %s or not active", jobID, runnerID) } return nil } // CancelJob cancels a pending or active job. func (r *JobRepository) CancelJob(ctx context.Context, jobID string, cancelledBy string) error { res, err := r.db.pool.Exec(ctx, ` UPDATE jobs SET status = 'cancelled', cancelled_by = $2, completed_at = now() WHERE id = $1 AND status IN ('pending', 'claimed', 'running') `, jobID, cancelledBy) if err != nil { return fmt.Errorf("cancelling job: %w", err) } if res.RowsAffected() == 0 { return fmt.Errorf("job %s not cancellable", jobID) } return nil } // TimeoutExpiredJobs marks expired claimed/running jobs as failed. // Returns the number of jobs timed out. func (r *JobRepository) TimeoutExpiredJobs(ctx context.Context) (int64, error) { result, err := r.db.pool.Exec(ctx, ` UPDATE jobs SET status = 'failed', error_message = 'job timed out', completed_at = now() WHERE status IN ('claimed', 'running') AND expires_at IS NOT NULL AND expires_at < now() `) if err != nil { return 0, fmt.Errorf("timing out expired jobs: %w", err) } return result.RowsAffected(), nil } // --------------------------------------------------------------------------- // Job Log // --------------------------------------------------------------------------- // AppendLog adds a log entry to a job. func (r *JobRepository) AppendLog(ctx context.Context, entry *JobLogEntry) error { metaJSON, err := json.Marshal(entry.Metadata) if err != nil { return fmt.Errorf("marshaling log metadata: %w", err) } err = r.db.pool.QueryRow(ctx, ` INSERT INTO job_log (job_id, level, message, metadata) VALUES ($1, $2, $3, $4) RETURNING id, timestamp `, entry.JobID, entry.Level, entry.Message, metaJSON, ).Scan(&entry.ID, &entry.Timestamp) if err != nil { return fmt.Errorf("appending job log: %w", err) } return nil } // GetJobLogs returns all log entries for a job. func (r *JobRepository) GetJobLogs(ctx context.Context, jobID string) ([]*JobLogEntry, error) { rows, err := r.db.pool.Query(ctx, ` SELECT id, job_id, timestamp, level, message, metadata FROM job_log WHERE job_id = $1 ORDER BY timestamp ASC `, jobID) if err != nil { return nil, fmt.Errorf("querying job logs: %w", err) } defer rows.Close() var entries []*JobLogEntry for rows.Next() { e := &JobLogEntry{} var metaJSON []byte if err := rows.Scan(&e.ID, &e.JobID, &e.Timestamp, &e.Level, &e.Message, &metaJSON); err != nil { return nil, fmt.Errorf("scanning job log: %w", err) } if metaJSON != nil { if err := json.Unmarshal(metaJSON, &e.Metadata); err != nil { return nil, fmt.Errorf("unmarshaling log metadata: %w", err) } } entries = append(entries, e) } return entries, rows.Err() } // --------------------------------------------------------------------------- // Runners // --------------------------------------------------------------------------- // RegisterRunner creates a new runner record. func (r *JobRepository) RegisterRunner(ctx context.Context, runner *Runner) error { metaJSON, err := json.Marshal(runner.Metadata) if err != nil { return fmt.Errorf("marshaling runner metadata: %w", err) } err = r.db.pool.QueryRow(ctx, ` INSERT INTO runners (name, token_hash, token_prefix, tags, status, metadata) VALUES ($1, $2, $3, $4, 'offline', $5) RETURNING id, created_at, updated_at `, runner.Name, runner.TokenHash, runner.TokenPrefix, runner.Tags, metaJSON, ).Scan(&runner.ID, &runner.CreatedAt, &runner.UpdatedAt) if err != nil { return fmt.Errorf("registering runner: %w", err) } runner.Status = "offline" return nil } // GetRunnerByToken looks up a runner by token hash. func (r *JobRepository) GetRunnerByToken(ctx context.Context, tokenHash string) (*Runner, error) { runner := &Runner{} var metaJSON []byte err := r.db.pool.QueryRow(ctx, ` SELECT id, name, token_hash, token_prefix, tags, status, last_heartbeat, last_job_id, metadata, created_at, updated_at FROM runners WHERE token_hash = $1 `, tokenHash).Scan( &runner.ID, &runner.Name, &runner.TokenHash, &runner.TokenPrefix, &runner.Tags, &runner.Status, &runner.LastHeartbeat, &runner.LastJobID, &metaJSON, &runner.CreatedAt, &runner.UpdatedAt, ) if err == pgx.ErrNoRows { return nil, nil } if err != nil { return nil, fmt.Errorf("querying runner by token: %w", err) } if metaJSON != nil { if err := json.Unmarshal(metaJSON, &runner.Metadata); err != nil { return nil, fmt.Errorf("unmarshaling runner metadata: %w", err) } } return runner, nil } // GetRunner returns a runner by ID. func (r *JobRepository) GetRunner(ctx context.Context, runnerID string) (*Runner, error) { runner := &Runner{} var metaJSON []byte err := r.db.pool.QueryRow(ctx, ` SELECT id, name, token_hash, token_prefix, tags, status, last_heartbeat, last_job_id, metadata, created_at, updated_at FROM runners WHERE id = $1 `, runnerID).Scan( &runner.ID, &runner.Name, &runner.TokenHash, &runner.TokenPrefix, &runner.Tags, &runner.Status, &runner.LastHeartbeat, &runner.LastJobID, &metaJSON, &runner.CreatedAt, &runner.UpdatedAt, ) if err == pgx.ErrNoRows { return nil, nil } if err != nil { return nil, fmt.Errorf("querying runner: %w", err) } if metaJSON != nil { if err := json.Unmarshal(metaJSON, &runner.Metadata); err != nil { return nil, fmt.Errorf("unmarshaling runner metadata: %w", err) } } return runner, nil } // Heartbeat updates a runner's heartbeat timestamp and sets status to online. func (r *JobRepository) Heartbeat(ctx context.Context, runnerID string) error { res, err := r.db.pool.Exec(ctx, ` UPDATE runners SET status = 'online', last_heartbeat = now(), updated_at = now() WHERE id = $1 `, runnerID) if err != nil { return fmt.Errorf("updating heartbeat: %w", err) } if res.RowsAffected() == 0 { return fmt.Errorf("runner %s not found", runnerID) } return nil } // ListRunners returns all registered runners. func (r *JobRepository) ListRunners(ctx context.Context) ([]*Runner, error) { rows, err := r.db.pool.Query(ctx, ` SELECT id, name, token_hash, token_prefix, tags, status, last_heartbeat, last_job_id, metadata, created_at, updated_at FROM runners ORDER BY name `) if err != nil { return nil, fmt.Errorf("querying runners: %w", err) } defer rows.Close() var runners []*Runner for rows.Next() { runner := &Runner{} var metaJSON []byte if err := rows.Scan( &runner.ID, &runner.Name, &runner.TokenHash, &runner.TokenPrefix, &runner.Tags, &runner.Status, &runner.LastHeartbeat, &runner.LastJobID, &metaJSON, &runner.CreatedAt, &runner.UpdatedAt, ); err != nil { return nil, fmt.Errorf("scanning runner: %w", err) } if metaJSON != nil { if err := json.Unmarshal(metaJSON, &runner.Metadata); err != nil { return nil, fmt.Errorf("unmarshaling runner metadata: %w", err) } } runners = append(runners, runner) } return runners, rows.Err() } // DeleteRunner removes a runner by ID. func (r *JobRepository) DeleteRunner(ctx context.Context, runnerID string) error { res, err := r.db.pool.Exec(ctx, `DELETE FROM runners WHERE id = $1`, runnerID) if err != nil { return fmt.Errorf("deleting runner: %w", err) } if res.RowsAffected() == 0 { return fmt.Errorf("runner %s not found", runnerID) } return nil } // ExpireStaleRunners marks runners with no recent heartbeat as offline. func (r *JobRepository) ExpireStaleRunners(ctx context.Context, timeout time.Duration) (int64, error) { result, err := r.db.pool.Exec(ctx, ` UPDATE runners SET status = 'offline', updated_at = now() WHERE status = 'online' AND last_heartbeat < now() - $1::interval `, timeout.String()) if err != nil { return 0, fmt.Errorf("expiring stale runners: %w", err) } return result.RowsAffected(), nil } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- func scanJobDefinitions(rows pgx.Rows) ([]*JobDefinitionRecord, error) { var defs []*JobDefinitionRecord for rows.Next() { d := &JobDefinitionRecord{} var defJSON []byte if err := rows.Scan( &d.ID, &d.Name, &d.Version, &d.TriggerType, &d.ScopeType, &d.ComputeType, &d.RunnerTags, &d.TimeoutSeconds, &d.MaxRetries, &d.Priority, &defJSON, &d.Enabled, &d.CreatedAt, &d.UpdatedAt, ); err != nil { return nil, fmt.Errorf("scanning job definition: %w", err) } if defJSON != nil { if err := json.Unmarshal(defJSON, &d.Definition); err != nil { return nil, fmt.Errorf("unmarshaling definition: %w", err) } } defs = append(defs, d) } return defs, rows.Err() } func scanJobs(rows pgx.Rows) ([]*Job, error) { var jobs []*Job for rows.Next() { j := &Job{} var scopeJSON, resultJSON []byte if err := rows.Scan( &j.ID, &j.JobDefinitionID, &j.DefinitionName, &j.Status, &j.Priority, &j.ItemID, &j.ProjectID, &scopeJSON, &j.RunnerID, &j.RunnerTags, &j.CreatedAt, &j.ClaimedAt, &j.StartedAt, &j.CompletedAt, &j.TimeoutSeconds, &j.ExpiresAt, &j.Progress, &j.ProgressMessage, &resultJSON, &j.ErrorMessage, &j.RetryCount, &j.MaxRetries, &j.CreatedBy, &j.CancelledBy, ); err != nil { return nil, fmt.Errorf("scanning job: %w", err) } if scopeJSON != nil { if err := json.Unmarshal(scopeJSON, &j.ScopeMetadata); err != nil { return nil, fmt.Errorf("unmarshaling scope metadata: %w", err) } } if resultJSON != nil { if err := json.Unmarshal(resultJSON, &j.Result); err != nil { return nil, fmt.Errorf("unmarshaling result: %w", err) } } jobs = append(jobs, j) } return jobs, rows.Err() }