Let's Build a Customer Support AI Copilot: An Event-Driven Agent with LangGraph, Go, pgvector & Redis Streams [Part 1]

Karan Kashyap
June 19, 2026
In this series, we are going to build A grounded, event-driven AI customer-support copilot built on Go, LangGraph, pgvector, and Redis Streams.
Support teams spend hours re-typing the same answers. Resolver solves that: for every incoming customer message it classifies intent, retrieves grounded knowledge, drafts a cited reply, checks it against policy, and only hands off to a human when confidence is too low to trust automation.
Part 1: Architecture & Contracts — GraphQL Schema, Redis Events & Postgres
This is a six-part series. In this post we lay out the full system architecture and pin down every inter-service contract before writing a line of application code. Part 2 covers the Go GraphQL API.
What We're Building
Resolver is an event-driven AI copilot for customer support. The golden path looks like this:
- Customer message arrives via a GraphQL mutation.
- The Go API validates, persists, and publishes a
message.createdevent to Redis Streams — then returns immediately. - A Python worker consumes the event and runs a LangGraph state machine: triage → retrieve (RAG) → draft (grounded) → guard (policy) → decide.
- If confidence is high enough the draft becomes
SUGGESTED. Otherwise it'sESCALATEDto a human. - The result streams back to the UI in real-time via a GraphQL subscription bridged from the drafts Redis stream.
- A support agent reviews, edits if needed, and approves. Every action is audited.

The system runs 100% locally at $0 via docker compose up — Postgres + pgvector, Redis, Ollama, and the three application services. No paid third-party required.
System Architecture
The core insight: the Go API never calls the LLM. It validates, persists, and publishes one event. All the expensive AI work happens in the Python worker, which scales independently. Workers talk back only via the typed event schema on Redis — never by calling the API directly.
The LangGraph Agent Graph
The Python worker is a LangGraph state machine. Each node is a pure function with a typed input/output — individually testable, easy to inspect.
Node responsibilities at a glance:
| Node | Does |
|---|---|
| Triage | Classifies intent, category, sentiment, urgency with a small/cheap model |
| Retrieve | Hybrid (vector + keyword) KB search → re-rank → attach top-k sources |
| Draft | LLM generates {answer, citations[], confidence} grounded only in retrieved chunks |
| Guard | Checks groundedness, tone, forbidden actions — pass/fail + reasons |
| Decision | Routes: high confidence + pass → Finalize; fail → Repair (once); low conf → Escalate |
| Repair | One corrective pass feeding guard feedback back to Draft |
| Finalize / Escalate | Terminal states — writes draft + status to Postgres |
Three Contracts First
Before any application code, we nail down the three contracts that every layer depends on.
Contract 1 — The GraphQL Schema
The schema is the source of truth for both the Go API (gqlgen generates server types from it) and the Next.js frontend (graphql-codegen generates TypeScript types). Change the schema; regenerate both sides.
1# packages/graphql/schema.graphql23scalar DateTime4scalar JSON56type Conversation {7 id: ID!8 status: ConvStatus!9 messages: [Message!]!10 createdAt: DateTime!11}1213type Message {14 id: ID!15 role: Role!16 body: String!17 draft: Draft # populated once the agent pipeline finishes18 createdAt: DateTime!19}2021type Draft {22 id: ID!23 messageId: ID!24 conversationId: ID!25 intent: String!26 category: String!27 sentiment: Sentiment!28 urgency: Urgency!29 answer: String!30 citations: [Citation!]!31 suggestedAction: String32 confidence: Float!33 status: DraftStatus!34 guard: GuardReport!35 createdAt: DateTime!36}3738type Citation {39 kbId: ID!40 title: String!41 snippet: String!42}4344type GuardReport {45 grounded: Boolean!46 tone: Boolean!47 policy: Boolean!48 reasons: [String!]!49}5051enum Role { CUSTOMER AGENT SYSTEM }52enum ConvStatus { OPEN ESCALATED RESOLVED }53enum DraftStatus { PENDING SUGGESTED ESCALATED SENT REJECTED }54enum Sentiment { POSITIVE NEUTRAL NEGATIVE }55enum Urgency { LOW NORMAL HIGH }5657type Query {58 conversation(id: ID!): Conversation59 queue(status: DraftStatus, limit: Int = 25, cursor: String): DraftConnection!60 evalRuns(limit: Int = 10): [EvalRun!]!61 dashboardStats: DashboardStats!62 searchKB(query: String!, limit: Int = 8): [Citation!]!63}6465type Mutation {66 ingestMessage(input: IngestInput!): Message! # validate + persist + publish event67 approveReply(draftId: ID!, edited: String): Draft! # human send68 rejectReply(draftId: ID!, reason: String): Draft!69 escalate(draftId: ID!): Draft!70 reprocess(messageId: ID!): Message!71}7273type Subscription {74 draftUpdates(conversationId: ID): Draft! # bridged from the Redis drafts stream75}7677input IngestInput { conversationId: ID, customerRef: String, body: String! }
Key design decisions baked in:
draftis nullable onMessage— it doesn't exist until the worker finishes.DraftStatushas five states:PENDING → SUGGESTED(confident) orESCALATED(not), thenSENT/REJECTEDafter human action.citationsandguardare first-class fields — grounding is not optional.confidenceis aFloat(0–1), surfaced directly in the UI as the confidence meter.draftUpdatessubscription filters byconversationId— the UI only receives events for the open conversation.
Contract 2 — The Redis Event Schema
The Go API and Python worker communicate only through typed events on Redis Streams. No direct HTTP calls between them. The schema is defined as JSON Schema so both sides can validate before parsing.
1// packages/events/events.schema.json2{3 "$schema": "https://json-schema.org/draft/2020-12/schema",4 "title": "Resolver Event Contract v1",5 "x-streams": {6 "messages": "API → Worker. message.created events.",7 "drafts": "Worker → API. draft.ready / draft.escalated; bridged to draftUpdates.",8 "dead-letter": "Worker → ops. Poison messages after retry exhaustion."9 },10 "x-schema-version": 1,11 "oneOf": [12 { "$ref": "#/$defs/messageCreated" },13 { "$ref": "#/$defs/draftReady" },14 { "$ref": "#/$defs/draftEscalated"}15 ],16 "$defs": {17 "messageCreated": {18 "required": ["schema_version","type","event_id","message_id",19 "conversation_id","body","created_at"],20 "properties": {21 "type": { "const": "message.created" },22 "event_id": { "type": "string",23 "description": "ULID/UUID. Dedupe key for idempotent workers." },24 "message_id": { "type": "string", "format": "uuid" },25 "conversation_id": { "type": "string", "format": "uuid" },26 "body": { "type": "string", "minLength": 1 },27 "trace_id": { "type": "string",28 "description": "OTel trace id propagated API→bus→worker." }29 }30 },31 "draftReady": {32 "required": ["type","event_id","draft_id","message_id","conversation_id","status"],33 "properties": {34 "type": { "const": "draft.ready" },35 "status": { "const": "SUGGESTED" }36 }37 },38 "draftEscalated": {39 "required": ["type","event_id","draft_id","message_id","conversation_id","status","reason"],40 "properties": {41 "type": { "const": "draft.escalated" },42 "status": { "const": "ESCALATED" },43 "reason": {44 "type": "string",45 "description":46 "low_confidence|guard_failed|repair_exhausted|forbidden_action|retrieval_weak"47 }48 }49 }50 }51}
Three things worth noting:
event_idis the dedupe key. Workers check this before processing. Publishing the same event twice is safe; processing it twice is not.trace_idpropagates the OTel trace from the API HTTP request through the event bus all the way into the worker's LLM call — one unbroken trace.reasonon escalation is typed. Knowing why a draft escalated (weak retrieval vs. guard failure vs. low confidence) lets the dashboard surface actionable quality signals.
Contract 3 — The Database Schema
Postgres holds every durable entity: conversations, messages, KB documents with their vector embeddings, drafts, eval runs, and the audit log.
1-- db/migrations/000001_init.up.sql23CREATE EXTENSION IF NOT EXISTS vector; -- pgvector: cosine ANN search4CREATE EXTENSION IF NOT EXISTS pgcrypto; -- gen_random_uuid()56CREATE TABLE conversations (7 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),8 external_id TEXT UNIQUE,9 customer_ref TEXT NOT NULL,10 status TEXT NOT NULL DEFAULT 'OPEN'11 CHECK (status IN ('OPEN','ESCALATED','RESOLVED')),12 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()13);1415CREATE TABLE messages (16 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),17 conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,18 role TEXT NOT NULL CHECK (role IN ('CUSTOMER','AGENT','SYSTEM')),19 body TEXT NOT NULL,20 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()21);2223CREATE TABLE kb_documents (24 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),25 source TEXT NOT NULL, -- 'bitext' | 'policy' | 'manual'26 intent TEXT NOT NULL,27 category TEXT NOT NULL,28 title TEXT NOT NULL,29 content TEXT NOT NULL,30 embedding VECTOR(768) NOT NULL -- pgvector; hnsw index in migration 231);3233CREATE TABLE drafts (34 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),35 message_id UUID NOT NULL REFERENCES messages(id) ON DELETE CASCADE,36 intent TEXT NOT NULL,37 category TEXT NOT NULL,38 sentiment TEXT NOT NULL CHECK (sentiment IN ('POSITIVE','NEUTRAL','NEGATIVE')),39 urgency TEXT NOT NULL CHECK (urgency IN ('LOW','NORMAL','HIGH')),40 answer TEXT NOT NULL,41 citations JSONB NOT NULL DEFAULT '[]', -- [{kb_id, title, snippet}]42 suggested_action TEXT,43 confidence NUMERIC NOT NULL CHECK (confidence >= 0 AND confidence <= 1),44 status TEXT NOT NULL45 CHECK (status IN ('PENDING','SUGGESTED','ESCALATED','SENT','REJECTED')),46 guard JSONB NOT NULL DEFAULT '{}', -- {grounded, tone, policy, reasons[]}47 model TEXT NOT NULL,48 tokens INT NOT NULL DEFAULT 0,49 cost_cents NUMERIC NOT NULL DEFAULT 0,50 latency_ms INT,51 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()52);5354CREATE TABLE eval_runs (55 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),56 dataset TEXT NOT NULL,57 n INT NOT NULL,58 groundedness NUMERIC NOT NULL,59 routing_accuracy NUMERIC NOT NULL,60 answer_score NUMERIC NOT NULL,61 retrieval_recall NUMERIC,62 safety_violations INT NOT NULL DEFAULT 0,63 avg_cost_cents NUMERIC,64 p95_latency_ms INT,65 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()66);6768CREATE TABLE audit_log (69 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),70 draft_id UUID NOT NULL,71 actor TEXT NOT NULL,72 action TEXT NOT NULL,73 before JSONB,74 after JSONB,75 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()76);
Then migration 2 adds the performance indexes:
1-- db/migrations/000002_indexes.up.sql23-- Conversation timeline reads.4CREATE INDEX idx_messages_conv_created ON messages (conversation_id, created_at);56-- Queue filtering by draft status.7CREATE INDEX idx_drafts_status ON drafts (status);89-- Hybrid-retrieval pre-filter by predicted intent/category.10CREATE INDEX idx_kb_intent_cat ON kb_documents (intent, category);1112-- ANN cosine search over embeddings (HNSW = low-latency approximate nearest neighbour).13CREATE INDEX idx_kb_embedding ON kb_documents14 USING hnsw (embedding vector_cosine_ops);1516-- Keyword half of hybrid retrieval.17CREATE INDEX idx_kb_content_fts ON kb_documents18 USING gin (to_tsvector('english', title || ' ' || content));
The HNSW index is the heart of hybrid retrieval. The GIN index on to_tsvector is the keyword half — Postgres FTS, no external search engine required.
The Local Stack
Everything runs in one docker compose up. Migrations run as a one-shot service and complete before the API boots.
1# deploy/docker-compose.yml (abridged)23services:4 postgres:5 image: pgvector/pgvector:pg166 healthcheck:7 test: ["CMD-SHELL", "pg_isready -U resolver -d resolver"]8 interval: 5s9 retries: 101011 redis:12 image: redis:7.4-alpine13 command: ["redis-server", "--appendonly", "yes"]1415 ollama:16 image: ollama/ollama:0.5.717 deploy:18 resources:19 reservations:20 devices:21 - driver: nvidia22 count: all23 capabilities: [gpu]2425 migrate:26 image: migrate/migrate:v4.18.127 depends_on:28 postgres: { condition: service_healthy }29 volumes:30 - ../db/migrations:/migrations:ro31 command: ["-path=/migrations", "-database=postgres://...", "up"]32 restart: on-failure3334 api:35 build: { context: .., dockerfile: services/api/Dockerfile }36 depends_on:37 postgres: { condition: service_healthy }38 redis: { condition: service_healthy }39 migrate: { condition: service_completed_successfully }40 ports: ["8080:8080"]4142 worker:43 build: { context: ../workers/agent }44 depends_on: [postgres, redis, ollama, migrate]45 restart: on-failure4647 web:48 build: { context: .., dockerfile: apps/web/Dockerfile }49 depends_on: [api]50 ports: ["3000:3000"]
postgres: { condition: service_healthy } means the API waits for pg_isready before it starts, not just for the container to be running. Migrations get service_completed_successfully — the API will never boot against an un-migrated schema.
Quickstart
1# Clone and boot2git clone https://github.com/karankashyap/resolver3cd resolver45cp .env.example .env6docker compose -f deploy/docker-compose.yml up78# Pull local models (first run only)9docker compose -f deploy/docker-compose.yml exec ollama ollama pull nomic-embed-text10docker compose -f deploy/docker-compose.yml exec ollama ollama pull qwen2.5:3b11docker compose -f deploy/docker-compose.yml exec ollama ollama pull qwen2.5:7b1213# Seed the knowledge base from the Bitext dataset14make ingest1516# Open the GraphQL playground17open http://localhost:8080
![Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 4]](/_next/image/?url=https%3A%2F%2Fcdn.sanity.io%2Fimages%2F3e1sexdu%2Fproduction%2Feeb1314f51d4c39e5d1e176c2c837de8f33725ca-1600x739.png%3Frect%3D61%2C0%2C1478%2C739%26w%3D800%26h%3D400%26q%3D85%26fit%3Dcrop%26auto%3Dformat&w=3840&q=75)