feat: dependency DAG and YAML-defined compute jobs #92

Merged
forbes merged 13 commits from feat-dag-workers into main 2026-02-14 19:27:19 +00:00
3 changed files with 178 additions and 0 deletions
Showing only changes of commit 83e0d6821c - Show all commits

View File

@@ -80,6 +80,8 @@ func TruncateAll(t *testing.T, pool *pgxpool.Pool) {
_, err := pool.Exec(context.Background(), `
TRUNCATE
job_log, jobs, job_definitions, runners,
dag_cross_edges, dag_edges, dag_nodes,
audit_log, sync_log, api_tokens, sessions, item_files,
item_projects, relationships, revisions, inventory, items,
projects, sequences_by_name, users, property_migrations

View File

@@ -0,0 +1,67 @@
-- Dependency DAG: feature-level nodes and edges within items.
-- Migration: 014_dag_nodes_edges
-- Date: 2026-02
BEGIN;
--------------------------------------------------------------------------------
-- DAG Nodes (feature-level nodes within an item's revision)
--------------------------------------------------------------------------------
CREATE TABLE dag_nodes (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
item_id UUID NOT NULL REFERENCES items(id) ON DELETE CASCADE,
revision_number INTEGER NOT NULL,
node_key TEXT NOT NULL,
node_type TEXT NOT NULL,
properties_hash TEXT,
validation_state TEXT NOT NULL DEFAULT 'clean',
validation_msg TEXT,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(item_id, revision_number, node_key)
);
CREATE INDEX idx_dag_nodes_item ON dag_nodes(item_id);
CREATE INDEX idx_dag_nodes_item_rev ON dag_nodes(item_id, revision_number);
CREATE INDEX idx_dag_nodes_state ON dag_nodes(validation_state)
WHERE validation_state != 'clean';
CREATE INDEX idx_dag_nodes_type ON dag_nodes(node_type);
--------------------------------------------------------------------------------
-- DAG Edges (dependencies between nodes within a single item)
-- Direction: source → target means "target depends on source"
--------------------------------------------------------------------------------
CREATE TABLE dag_edges (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_node_id UUID NOT NULL REFERENCES dag_nodes(id) ON DELETE CASCADE,
target_node_id UUID NOT NULL REFERENCES dag_nodes(id) ON DELETE CASCADE,
edge_type TEXT NOT NULL DEFAULT 'depends_on',
metadata JSONB DEFAULT '{}',
UNIQUE(source_node_id, target_node_id, edge_type),
CONSTRAINT no_self_edge CHECK (source_node_id != target_node_id)
);
CREATE INDEX idx_dag_edges_source ON dag_edges(source_node_id);
CREATE INDEX idx_dag_edges_target ON dag_edges(target_node_id);
--------------------------------------------------------------------------------
-- Cross-item DAG edges (linking feature nodes across BOM boundaries)
--------------------------------------------------------------------------------
CREATE TABLE dag_cross_edges (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_node_id UUID NOT NULL REFERENCES dag_nodes(id) ON DELETE CASCADE,
target_node_id UUID NOT NULL REFERENCES dag_nodes(id) ON DELETE CASCADE,
relationship_id UUID REFERENCES relationships(id) ON DELETE SET NULL,
edge_type TEXT NOT NULL DEFAULT 'assembly_ref',
metadata JSONB DEFAULT '{}',
UNIQUE(source_node_id, target_node_id)
);
CREATE INDEX idx_dag_cross_source ON dag_cross_edges(source_node_id);
CREATE INDEX idx_dag_cross_target ON dag_cross_edges(target_node_id);
COMMIT;

View File

@@ -0,0 +1,109 @@
-- Worker system: runners, job definitions, jobs, and job log.
-- Migration: 015_jobs_runners
-- Date: 2026-02
BEGIN;
--------------------------------------------------------------------------------
-- Runners (registered compute workers)
--------------------------------------------------------------------------------
CREATE TABLE runners (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT UNIQUE NOT NULL,
token_hash TEXT NOT NULL,
token_prefix TEXT NOT NULL,
tags TEXT[] NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'offline',
last_heartbeat TIMESTAMPTZ,
last_job_id UUID,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_runners_status ON runners(status);
CREATE INDEX idx_runners_token ON runners(token_hash);
--------------------------------------------------------------------------------
-- Job Definitions (parsed from YAML, stored for reference and FK)
--------------------------------------------------------------------------------
CREATE TABLE job_definitions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT UNIQUE NOT NULL,
version INTEGER NOT NULL DEFAULT 1,
trigger_type TEXT NOT NULL,
scope_type TEXT NOT NULL,
compute_type TEXT NOT NULL,
runner_tags TEXT[] NOT NULL DEFAULT '{}',
timeout_seconds INTEGER NOT NULL DEFAULT 600,
max_retries INTEGER NOT NULL DEFAULT 1,
priority INTEGER NOT NULL DEFAULT 100,
definition JSONB NOT NULL,
enabled BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_job_defs_trigger ON job_definitions(trigger_type);
CREATE INDEX idx_job_defs_enabled ON job_definitions(enabled) WHERE enabled = true;
--------------------------------------------------------------------------------
-- Jobs (individual compute job instances)
--------------------------------------------------------------------------------
CREATE TYPE job_status AS ENUM (
'pending', 'claimed', 'running', 'completed', 'failed', 'cancelled'
);
CREATE TABLE jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_definition_id UUID REFERENCES job_definitions(id) ON DELETE SET NULL,
definition_name TEXT NOT NULL,
status job_status NOT NULL DEFAULT 'pending',
priority INTEGER NOT NULL DEFAULT 100,
item_id UUID REFERENCES items(id) ON DELETE CASCADE,
project_id UUID REFERENCES projects(id) ON DELETE SET NULL,
scope_metadata JSONB DEFAULT '{}',
runner_id UUID REFERENCES runners(id) ON DELETE SET NULL,
runner_tags TEXT[] NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
claimed_at TIMESTAMPTZ,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
timeout_seconds INTEGER NOT NULL DEFAULT 600,
expires_at TIMESTAMPTZ,
progress INTEGER DEFAULT 0,
progress_message TEXT,
result JSONB,
error_message TEXT,
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 1,
created_by TEXT,
cancelled_by TEXT
);
CREATE INDEX idx_jobs_status ON jobs(status);
CREATE INDEX idx_jobs_pending ON jobs(status, priority, created_at)
WHERE status = 'pending';
CREATE INDEX idx_jobs_item ON jobs(item_id);
CREATE INDEX idx_jobs_runner ON jobs(runner_id);
CREATE INDEX idx_jobs_definition ON jobs(job_definition_id);
--------------------------------------------------------------------------------
-- Job Log (append-only progress entries)
--------------------------------------------------------------------------------
CREATE TABLE job_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_id UUID NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,
timestamp TIMESTAMPTZ NOT NULL DEFAULT now(),
level TEXT NOT NULL DEFAULT 'info',
message TEXT NOT NULL,
metadata JSONB DEFAULT '{}'
);
CREATE INDEX idx_job_log_job ON job_log(job_id, timestamp);
COMMIT;