package db import ( "context" "encoding/json" "time" "github.com/jackc/pgx/v5" ) // Integration represents an ERP integration configuration. type Integration struct { ID string Name string Enabled bool Config map[string]any CreatedAt time.Time UpdatedAt time.Time } // SyncLog represents a sync log entry. type SyncLog struct { ID string IntegrationID string ItemID *string Direction string Status string ExternalID string ExternalModel string RequestPayload json.RawMessage ResponsePayload json.RawMessage ErrorMessage string StartedAt *time.Time CompletedAt *time.Time CreatedAt time.Time } // IntegrationRepository provides integration database operations. type IntegrationRepository struct { db *DB } // NewIntegrationRepository creates a new integration repository. func NewIntegrationRepository(db *DB) *IntegrationRepository { return &IntegrationRepository{db: db} } // GetByName returns an integration by name. func (r *IntegrationRepository) GetByName(ctx context.Context, name string) (*Integration, error) { row := r.db.pool.QueryRow(ctx, ` SELECT id, name, enabled, config, created_at, updated_at FROM integrations WHERE name = $1 `, name) var i Integration var configJSON []byte err := row.Scan(&i.ID, &i.Name, &i.Enabled, &configJSON, &i.CreatedAt, &i.UpdatedAt) if err == pgx.ErrNoRows { return nil, nil } if err != nil { return nil, err } if len(configJSON) > 0 { if err := json.Unmarshal(configJSON, &i.Config); err != nil { return nil, err } } return &i, nil } // Upsert creates or updates an integration by name. func (r *IntegrationRepository) Upsert(ctx context.Context, name string, enabled bool, config map[string]any) error { configJSON, err := json.Marshal(config) if err != nil { return err } _, err = r.db.pool.Exec(ctx, ` INSERT INTO integrations (name, enabled, config) VALUES ($1, $2, $3) ON CONFLICT (name) DO UPDATE SET enabled = EXCLUDED.enabled, config = EXCLUDED.config, updated_at = now() `, name, enabled, configJSON) return err } // CreateSyncLog inserts a new sync log entry. func (r *IntegrationRepository) CreateSyncLog(ctx context.Context, entry *SyncLog) error { _, err := r.db.pool.Exec(ctx, ` INSERT INTO sync_log (integration_id, item_id, direction, status, external_id, external_model, request_payload, response_payload, error_message, started_at, completed_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) `, entry.IntegrationID, entry.ItemID, entry.Direction, entry.Status, entry.ExternalID, entry.ExternalModel, entry.RequestPayload, entry.ResponsePayload, entry.ErrorMessage, entry.StartedAt, entry.CompletedAt) return err } // ListSyncLog returns recent sync log entries for an integration. func (r *IntegrationRepository) ListSyncLog(ctx context.Context, integrationID string, limit int) ([]*SyncLog, error) { if limit <= 0 { limit = 50 } rows, err := r.db.pool.Query(ctx, ` SELECT id, integration_id, item_id, direction, status, external_id, external_model, request_payload, response_payload, error_message, started_at, completed_at, created_at FROM sync_log WHERE integration_id = $1 ORDER BY created_at DESC LIMIT $2 `, integrationID, limit) if err != nil { return nil, err } defer rows.Close() var logs []*SyncLog for rows.Next() { var l SyncLog err := rows.Scan( &l.ID, &l.IntegrationID, &l.ItemID, &l.Direction, &l.Status, &l.ExternalID, &l.ExternalModel, &l.RequestPayload, &l.ResponsePayload, &l.ErrorMessage, &l.StartedAt, &l.CompletedAt, &l.CreatedAt, ) if err != nil { return nil, err } logs = append(logs, &l) } return logs, rows.Err() }