From 83e0d6821c5544fa084cc886cb9e25d6cfb8ab5f Mon Sep 17 00:00:00 2001 From: Forbes Date: Sat, 14 Feb 2026 13:04:41 -0600 Subject: [PATCH] feat: add database migrations for DAG and worker system Migration 014: dag_nodes, dag_edges, dag_cross_edges tables for the feature-level dependency graph with validation state tracking. Migration 015: runners, job_definitions, jobs, job_log tables for the async compute job system with PostgreSQL-backed work queue. Update TruncateAll in testutil to include new tables. --- internal/testutil/testutil.go | 2 + migrations/014_dag_nodes_edges.sql | 67 ++++++++++++++++++ migrations/015_jobs_runners.sql | 109 +++++++++++++++++++++++++++++ 3 files changed, 178 insertions(+) create mode 100644 migrations/014_dag_nodes_edges.sql create mode 100644 migrations/015_jobs_runners.sql diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index d7c62c9..58a0e0f 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -80,6 +80,8 @@ func TruncateAll(t *testing.T, pool *pgxpool.Pool) { _, err := pool.Exec(context.Background(), ` TRUNCATE + job_log, jobs, job_definitions, runners, + dag_cross_edges, dag_edges, dag_nodes, audit_log, sync_log, api_tokens, sessions, item_files, item_projects, relationships, revisions, inventory, items, projects, sequences_by_name, users, property_migrations diff --git a/migrations/014_dag_nodes_edges.sql b/migrations/014_dag_nodes_edges.sql new file mode 100644 index 0000000..866f8f9 --- /dev/null +++ b/migrations/014_dag_nodes_edges.sql @@ -0,0 +1,67 @@ +-- Dependency DAG: feature-level nodes and edges within items. +-- Migration: 014_dag_nodes_edges +-- Date: 2026-02 + +BEGIN; + +-------------------------------------------------------------------------------- +-- DAG Nodes (feature-level nodes within an item's revision) +-------------------------------------------------------------------------------- + +CREATE TABLE dag_nodes ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + item_id UUID NOT NULL REFERENCES items(id) ON DELETE CASCADE, + revision_number INTEGER NOT NULL, + node_key TEXT NOT NULL, + node_type TEXT NOT NULL, + properties_hash TEXT, + validation_state TEXT NOT NULL DEFAULT 'clean', + validation_msg TEXT, + metadata JSONB DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE(item_id, revision_number, node_key) +); + +CREATE INDEX idx_dag_nodes_item ON dag_nodes(item_id); +CREATE INDEX idx_dag_nodes_item_rev ON dag_nodes(item_id, revision_number); +CREATE INDEX idx_dag_nodes_state ON dag_nodes(validation_state) + WHERE validation_state != 'clean'; +CREATE INDEX idx_dag_nodes_type ON dag_nodes(node_type); + +-------------------------------------------------------------------------------- +-- DAG Edges (dependencies between nodes within a single item) +-- Direction: source → target means "target depends on source" +-------------------------------------------------------------------------------- + +CREATE TABLE dag_edges ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + source_node_id UUID NOT NULL REFERENCES dag_nodes(id) ON DELETE CASCADE, + target_node_id UUID NOT NULL REFERENCES dag_nodes(id) ON DELETE CASCADE, + edge_type TEXT NOT NULL DEFAULT 'depends_on', + metadata JSONB DEFAULT '{}', + UNIQUE(source_node_id, target_node_id, edge_type), + CONSTRAINT no_self_edge CHECK (source_node_id != target_node_id) +); + +CREATE INDEX idx_dag_edges_source ON dag_edges(source_node_id); +CREATE INDEX idx_dag_edges_target ON dag_edges(target_node_id); + +-------------------------------------------------------------------------------- +-- Cross-item DAG edges (linking feature nodes across BOM boundaries) +-------------------------------------------------------------------------------- + +CREATE TABLE dag_cross_edges ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + source_node_id UUID NOT NULL REFERENCES dag_nodes(id) ON DELETE CASCADE, + target_node_id UUID NOT NULL REFERENCES dag_nodes(id) ON DELETE CASCADE, + relationship_id UUID REFERENCES relationships(id) ON DELETE SET NULL, + edge_type TEXT NOT NULL DEFAULT 'assembly_ref', + metadata JSONB DEFAULT '{}', + UNIQUE(source_node_id, target_node_id) +); + +CREATE INDEX idx_dag_cross_source ON dag_cross_edges(source_node_id); +CREATE INDEX idx_dag_cross_target ON dag_cross_edges(target_node_id); + +COMMIT; diff --git a/migrations/015_jobs_runners.sql b/migrations/015_jobs_runners.sql new file mode 100644 index 0000000..0dd202a --- /dev/null +++ b/migrations/015_jobs_runners.sql @@ -0,0 +1,109 @@ +-- Worker system: runners, job definitions, jobs, and job log. +-- Migration: 015_jobs_runners +-- Date: 2026-02 + +BEGIN; + +-------------------------------------------------------------------------------- +-- Runners (registered compute workers) +-------------------------------------------------------------------------------- + +CREATE TABLE runners ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT UNIQUE NOT NULL, + token_hash TEXT NOT NULL, + token_prefix TEXT NOT NULL, + tags TEXT[] NOT NULL DEFAULT '{}', + status TEXT NOT NULL DEFAULT 'offline', + last_heartbeat TIMESTAMPTZ, + last_job_id UUID, + metadata JSONB DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX idx_runners_status ON runners(status); +CREATE INDEX idx_runners_token ON runners(token_hash); + +-------------------------------------------------------------------------------- +-- Job Definitions (parsed from YAML, stored for reference and FK) +-------------------------------------------------------------------------------- + +CREATE TABLE job_definitions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT UNIQUE NOT NULL, + version INTEGER NOT NULL DEFAULT 1, + trigger_type TEXT NOT NULL, + scope_type TEXT NOT NULL, + compute_type TEXT NOT NULL, + runner_tags TEXT[] NOT NULL DEFAULT '{}', + timeout_seconds INTEGER NOT NULL DEFAULT 600, + max_retries INTEGER NOT NULL DEFAULT 1, + priority INTEGER NOT NULL DEFAULT 100, + definition JSONB NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT true, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX idx_job_defs_trigger ON job_definitions(trigger_type); +CREATE INDEX idx_job_defs_enabled ON job_definitions(enabled) WHERE enabled = true; + +-------------------------------------------------------------------------------- +-- Jobs (individual compute job instances) +-------------------------------------------------------------------------------- + +CREATE TYPE job_status AS ENUM ( + 'pending', 'claimed', 'running', 'completed', 'failed', 'cancelled' +); + +CREATE TABLE jobs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + job_definition_id UUID REFERENCES job_definitions(id) ON DELETE SET NULL, + definition_name TEXT NOT NULL, + status job_status NOT NULL DEFAULT 'pending', + priority INTEGER NOT NULL DEFAULT 100, + item_id UUID REFERENCES items(id) ON DELETE CASCADE, + project_id UUID REFERENCES projects(id) ON DELETE SET NULL, + scope_metadata JSONB DEFAULT '{}', + runner_id UUID REFERENCES runners(id) ON DELETE SET NULL, + runner_tags TEXT[] NOT NULL DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + claimed_at TIMESTAMPTZ, + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + timeout_seconds INTEGER NOT NULL DEFAULT 600, + expires_at TIMESTAMPTZ, + progress INTEGER DEFAULT 0, + progress_message TEXT, + result JSONB, + error_message TEXT, + retry_count INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL DEFAULT 1, + created_by TEXT, + cancelled_by TEXT +); + +CREATE INDEX idx_jobs_status ON jobs(status); +CREATE INDEX idx_jobs_pending ON jobs(status, priority, created_at) + WHERE status = 'pending'; +CREATE INDEX idx_jobs_item ON jobs(item_id); +CREATE INDEX idx_jobs_runner ON jobs(runner_id); +CREATE INDEX idx_jobs_definition ON jobs(job_definition_id); + +-------------------------------------------------------------------------------- +-- Job Log (append-only progress entries) +-------------------------------------------------------------------------------- + +CREATE TABLE job_log ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + job_id UUID NOT NULL REFERENCES jobs(id) ON DELETE CASCADE, + timestamp TIMESTAMPTZ NOT NULL DEFAULT now(), + level TEXT NOT NULL DEFAULT 'info', + message TEXT NOT NULL, + metadata JSONB DEFAULT '{}' +); + +CREATE INDEX idx_job_log_job ON job_log(job_id, timestamp); + +COMMIT;