← Back to all posts
AITutorial

Let's Build a Customer Support AI Copilot: An Event-Driven Agent with LangGraph, Go, pgvector & Redis Streams [Part 3]

Karan Kashyap

Karan Kashyap

June 26, 2026

Let's Build a Customer Support AI Copilot: An Event-Driven Agent with LangGraph, Go, pgvector & Redis Streams [Part 3]
Part 3: The LangGraph Agent — Triage, RAG Retrieval & Grounded Drafting

Event-driven AI pipeline: idempotent consumer groups, a six-node LangGraph state machine, hybrid RAG retrieval, deterministic guard rails, and confidence-based routing — all in Python.


Package Layout

text
text
1workers/agent/
2├── main.py # entrypoint: wire → consume → handle → repeat
3├── bus.py # Redis Streams consumer group (XREADGROUP + XAUTOCLAIM)
4├── config.py # env config, validated at startup
5├── db.py # psycopg connection + draft insert
6├── schemas.py # Pydantic types: wire events + node outputs + GraphState
7├── policy.py # forbidden-action allow-list (deterministic, not the model)
8├── obs.py # OTel tracing
9├── graph/
10│ ├── build.py # assemble + compile the StateGraph
11│ ├── deps.py # Deps dataclass: wires LLM, retriever, config into nodes
12│ └── nodes/
13│ ├── triage.py # classify intent / category / sentiment / urgency
14│ ├── retrieve.py # hybrid KB search, weak-retrieval flag
15│ ├── draft.py # grounded reply generation (structured output + retry)
16│ ├── guard.py # deterministic groundedness + policy + tone checks
17│ ├── decision.py # single routing authority: finalize | repair | escalate
18│ └── repair.py # increment counter, package guard feedback
19├── rag/
20│ ├── retriever.py # Retriever: vector + keyword → RRF → lexical rerank
21│ ├── fusion.py # Reciprocal Rank Fusion
22│ └── rerank.py # query-term coverage blended rerank
23├── llm/
24│ ├── client.py # ChatLLM + Embeddings protocols, Ollama + OpenAI impls
25│ └── prompts.py # versioned system + user prompt templates
26└── tests/
27 ├── fakes.py # FakeChat, FakeRetriever, builders — zero network/DB
28 └── test_nodes.py # unit tests per node

The Shared State

Every node in the graph reads from and writes to a single GraphState TypedDict. LangGraph threads this state through the graph — each node returns a partial update, absent keys mean "not computed yet."

workers/agent/schemas.py
python
1# workers/agent/schemas.py
2
3class GraphState(TypedDict, total=False):
4 message: MessageCreated # the inbound event
5 trace_id: str # OTel trace id from the API (continues the trace)
6
7 triage: TriageResult # populated by: classify
8 retrieved: list[RetrievedDoc]
9 retrieval_weak: bool # True = best hit too distant → escalate, don't guess
10
11 draft: DraftResult # populated by: compose
12 guard: GuardReport # populated by: review
13
14 repair_count: int # how many repair passes so far
15 repair_feedback: str # guard reasons → fed back to the draft prompt
16
17 decision: Literal["finalize", "repair", "escalate"]
18 escalation_reason: Optional[EscalationReason]
19
20 # Accounting for the drafts row.
21 model: str
22 tokens_used: int
23 cost_cents: float

total=False means every key is optional. A node that doesn't need retrieved simply doesn't read it — no None checks scattered across nodes.

The structured output types each node produces are separately Pydantic-validated:

python
python
1class TriageResult(BaseModel):
2 intent: str = Field(min_length=1)
3 category: str = Field(min_length=1)
4 sentiment: Sentiment # Literal["POSITIVE", "NEUTRAL", "NEGATIVE"]
5 urgency: Urgency # Literal["LOW", "NORMAL", "HIGH"]
6 confidence: float = Field(ge=0.0, le=1.0)
7
8class DraftResult(BaseModel):
9 model_config = {"extra": "forbid"}
10 answer: str = Field(min_length=1)
11 citations: list[Citation] = Field(default_factory=list)
12 suggested_action: Optional[str] = None
13 confidence: float = Field(ge=0.0, le=1.0)
14
15class GuardReport(BaseModel):
16 grounded: bool
17 tone_ok: bool
18 policy_ok: bool
19 passed: bool # AND of the three checks
20 reasons: list[str] = Field(default_factory=list)

extra="forbid" on DraftResult means an LLM response with unexpected fields fails validation — it can't silently inject extra keys.


Assembling the Graph

workers/agent/graph/build.py
python
1# workers/agent/graph/build.py
2
3def build_graph(deps: Deps):
4 g = StateGraph(GraphState)
5
6 g.add_node("classify", partial(triage, deps=deps))
7 g.add_node("retrieve", partial(retrieve, deps=deps))
8 g.add_node("compose", partial(draft, deps=deps))
9 g.add_node("review", partial(guard, deps=deps))
10 g.add_node("route", partial(decision, deps=deps))
11 g.add_node("repair", partial(repair, deps=deps))
12
13 g.add_edge(START, "classify")
14 g.add_edge("classify", "retrieve")
15 g.add_edge("retrieve", "compose")
16 g.add_edge("compose", "review")
17 g.add_edge("review", "route")
18 g.add_conditional_edges(
19 "route", lambda state: state["decision"],
20 {"finalize": END, "escalate": END, "repair": "repair"},
21 )
22 g.add_edge("repair", "compose") # corrective pass goes back to draft
23
24 return g.compile()

functools.partial binds deps (LLM, retriever, config) into each node at compile time. The nodes themselves are plain functions — no class state, easy to test in isolation.

The repair loop is repair → compose → review → route. It's bounded: the decision node checks repair_count against cfg.max_repair_retries and escalates if exhausted.


The Six Nodes

Node 1 — Triage: Classify with a Small Model

workers/agent/graph/nodes/triage.py
python
1# workers/agent/graph/nodes/triage.py
2
3_FALLBACK = TriageResult(
4 intent="unknown", category="UNKNOWN",
5 sentiment="NEUTRAL", urgency="NORMAL", confidence=0.0
6)
7
8def triage(state: GraphState, deps: Deps) -> dict:
9 body = state["message"].body
10 res = deps.chat.complete(
11 deps.cfg.triage_model,
12 TRIAGE_SYSTEM,
13 triage_user_prompt(body, deps.intents, deps.categories),
14 json_mode=True,
15 )
16 try:
17 result = TriageResult.model_validate(json.loads(res.text))
18 except (json.JSONDecodeError, ValidationError):
19 result = _FALLBACK # zero confidence → decision node escalates
20
21 return {
22 "triage": result,
23 "tokens_used": state.get("tokens_used", 0) + res.tokens,
24 "cost_cents": state.get("cost_cents", 0.0) + res.cost_cents,
25 }

Two things worth noting. First, triage uses the small/cheap model (cfg.triage_model) — classification doesn't need a 70B model. Second, on malformed output the fallback sets confidence=0.0, which guarantees the decision node escalates — the system never guesses an intent.

The system prompt is terse and deterministic:

workers/agent/llm/prompts.py
python
1# workers/agent/llm/prompts.py
2
3TRIAGE_SYSTEM = (
4 "You are a customer-support triage classifier. Read the customer message and "
5 "return ONLY a JSON object with keys: intent, category, sentiment, urgency, "
6 "confidence. Choose intent and category from the provided taxonomy (closest "
7 "match). sentiment is one of POSITIVE, NEUTRAL, NEGATIVE. urgency is one of "
8 "LOW, NORMAL, HIGH. confidence is your certainty in [0,1]. No prose."
9)
10
11def triage_user_prompt(body: str, intents: list[str], categories: list[str]) -> str:
12 return (
13 f"Known intents: {', '.join(sorted(intents)) or '(none)'}\n"
14 f"Known categories: {', '.join(sorted(categories)) or '(none)'}\n\n"
15 f"Customer message:\n{body}"
16 )

The taxonomy (intents, categories) is loaded from the KB at startup — the model can only pick from real labels, which keeps routing accuracy measurable against the eval golden set.


Node 2 — Retrieve: Hybrid KB Search

workers/agent/graph/nodes/retrieve.py
python
1# workers/agent/graph/nodes/retrieve.py
2
3WEAK_DISTANCE = 0.6 # cosine distance; 1 - cosine_similarity
4
5def retrieve(state: GraphState, deps: Deps) -> dict:
6 triage = state["triage"]
7 docs = deps.retriever.retrieve(
8 query=state["message"].body,
9 intent=triage.intent,
10 category=triage.category,
11 )
12 weak = (not docs) or (min(d.distance for d in docs) > WEAK_DISTANCE)
13 return {"retrieved": docs, "retrieval_weak": weak}

retrieval_weak is the signal the decision node uses to escalate rather than let the model draft from thin context. Distance > 0.6 means even the closest chunk is more than 60% of the way across the cosine space — not close enough to ground a confident answer.

The Retriever runs two queries and fuses them:

workers/agent/rag/retriever.py
python
1# workers/agent/rag/retriever.py
2
3class Retriever:
4 def retrieve(self, query: str, intent: str, category: str = "") -> list[RetrievedDoc]:
5 qvec = self._embeddings.embed([query])[0]
6 candidates = self._hybrid(query, qvec, intent)
7 if not candidates:
8 candidates = self._hybrid(query, qvec, "") # global fallback
9 return lexical_rerank(query, candidates, self._top_k)
10
11 def _hybrid(self, query: str, qvec: list[float], intent: str) -> list[RetrievedDoc]:
12 vec = self._vector_search(qvec, intent)
13 kw = self._keyword_search(query, qvec, intent)
14 # Vector distance wins on overlap (more authoritative than ts_rank).
15 by_id: dict[str, RetrievedDoc] = {d.kb_id: d for d in kw}
16 by_id.update({d.kb_id: d for d in vec})
17 fused = reciprocal_rank_fusion([[d.kb_id for d in vec], [d.kb_id for d in kw]])
18 return [by_id[doc_id] for doc_id, _ in fused if doc_id in by_id]

Vector half — pgvector cosine ANN over the HNSW index, filtered by intent:

python
python
1_VECTOR_SQL = (
2 "SELECT id::text, intent, category, title, content, embedding <=> %s::vector AS dist "
3 "FROM kb_documents "
4 "WHERE (%s = '' OR intent = %s) "
5 "ORDER BY dist ASC LIMIT %s"
6)

Keyword half — Postgres FTS over the GIN index (same expression as migration 2, so the index is hit):

python
python
1_FTS = "to_tsvector('english', title || ' ' || content)"
2_KEYWORD_SQL = (
3 f"SELECT id::text, intent, category, title, content, embedding <=> %s::vector AS dist "
4 f"FROM kb_documents "
5 f"WHERE (%s = '' OR intent = %s) AND {_FTS} @@ plainto_tsquery('english', %s) "
6 f"ORDER BY ts_rank({_FTS}, plainto_tsquery('english', %s)) DESC LIMIT %s"
7)

Note that the keyword query also returns the cosine distance (embedding <=> %s::vector) — not for ranking, but so every returned doc carries a real distance for the retrieval_weak signal.


Reciprocal Rank Fusion

RRF merges the two ranked lists without needing to normalize scores across different scales (cosine distance vs. ts_rank). A document that surfaces in both halves gets a higher fused score.

workers/agent/rag/fusion.py
python
1# workers/agent/rag/fusion.py
2
3RRF_K = 60 # standard damping constant
4
5def reciprocal_rank_fusion(
6 ranked_lists: list[list[str]], k: int = RRF_K
7) -> list[tuple[str, float]]:
8 """Return (id, score) pairs ordered by fused score descending."""
9 scores: dict[str, float] = {}
10 order: dict[str, int] = {}
11 seen = 0
12 for ranked in ranked_lists:
13 for rank, doc_id in enumerate(ranked):
14 scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank + 1)
15 if doc_id not in order:
16 order[doc_id] = seen
17 seen += 1
18 return sorted(scores.items(), key=lambda kv: (-kv[1], order[kv[0]]))

After fusion, a lightweight lexical re-ranker blends the fused order with query-term coverage — no cross-encoder model needed, fully offline:

workers/agent/rag/rerank.py
python
1# workers/agent/rag/rerank.py
2
3def lexical_rerank(
4 query: str, docs: list[RetrievedDoc], top_k: int, weight: float = 0.5
5) -> list[RetrievedDoc]:
6 query_terms = _tokens(query)
7 n = len(docs)
8 scored = []
9 for idx, doc in enumerate(docs):
10 fused = 1.0 - idx / n # fused rank → 0..1
11 lexical = _coverage(query_terms, doc) # term overlap ratio
12 blended = weight * lexical + (1.0 - weight) * fused
13 scored.append((blended, idx, doc))
14 scored.sort(key=lambda s: (-s[0], s[1]))
15 return [doc for _, _, doc in scored[:top_k]]

weight=0.5 means it's a 50/50 blend — neither pure vector nor pure keyword dominates.


Node 3 — Draft: Grounded Reply with Structured Output

workers/agent/graph/nodes/draft.py
python
1# workers/agent/graph/nodes/draft.py
2
3_GIVE_UP = DraftResult(answer="(unable to produce a grounded answer)", citations=[], confidence=0.0)
4
5def draft(state: GraphState, deps: Deps) -> dict:
6 prompt = draft_user_prompt(
7 state["message"].body,
8 state.get("retrieved", []),
9 state.get("repair_feedback", ""), # non-empty on repair pass
10 tool_data,
11 )
12
13 res = _generate(deps, prompt)
14 result = _parse(res)
15
16 if result is None: # one retry on invalid structured output
17 retry = _generate(deps, prompt)
18 result = _parse(retry) or _GIVE_UP
19
20 return {
21 "draft": result,
22 "model": deps.cfg.draft_model,
23 "tokens_used": state.get("tokens_used", 0) + tokens,
24 "cost_cents": state.get("cost_cents", 0.0) + cost,
25 }

_GIVE_UP has confidence=0.0 — if two attempts at structured output both fail, the decision node escalates. The pipeline never sends a response the system can't parse.

The draft system prompt enforces grounding at the instruction level:

python
python
1DRAFT_SYSTEM = (
2 "You are a customer-support assistant. Write a reply GROUNDED ONLY in the "
3 "provided sources — never use outside knowledge. Cite the sources you used. "
4 "You may SUGGEST an action but must NEVER perform an irreversible one (refund, "
5 "cancellation, charge, account change); for those, set suggested_action to "
6 "'create_human_task'. If the sources do not cover the question, say so plainly "
7 "and set confidence low. Return ONLY a JSON object with keys: answer (string), "
8 "citations (array of {kb_id, title, snippet}), suggested_action (string or null), "
9 "confidence (number in [0,1]). No prose outside the JSON."
10)

On a repair pass the guard's rejection reasons are injected at the end of the user prompt:

python
python
1def draft_user_prompt(body, docs, feedback="", tool_data="") -> str:
2 sources = "\n\n".join(
3 f"[{i+1}] kb_id={d.kb_id} title={d.title}\n{d.content}"
4 for i, d in enumerate(docs)
5 ) if docs else "(no sources retrieved)"
6
7 parts = [f"Sources:\n{sources}"]
8 if tool_data:
9 parts.append(f"\nKnown data (from tools):\n{tool_data}")
10 parts.append(f"\nCustomer message:\n{body}")
11 if feedback:
12 parts.append(f"\nThe previous draft was rejected. Fix these issues:\n{feedback}")
13 return "\n".join(parts)

Node 4 — Guard: Deterministic Policy Checks

The guard runs in code, not the model. The verdict is reproducible, testable, and unaffected by LLM temperature.

workers/agent/graph/nodes/guard.py
python
1# workers/agent/graph/nodes/guard.py
2
3def guard(state: GraphState, deps: Deps) -> dict:
4 draft = state["draft"]
5 retrieved = state.get("retrieved", [])
6 reasons: list[str] = []
7
8 grounded = _check_grounded(draft, retrieved, reasons)
9 policy_ok = _check_policy(draft, reasons)
10 tone_ok = _check_tone(draft, reasons)
11
12 report = GuardReport(
13 grounded=grounded, tone_ok=tone_ok, policy_ok=policy_ok,
14 passed=grounded and tone_ok and policy_ok,
15 reasons=reasons,
16 )
17 return {"guard": report}

Groundedness check — every kb_id in citations must be a real retrieved source. A fabricated citation (a kb_id the model invented) is a hallucination and fails immediately:

python
python
1def _check_grounded(draft, retrieved, reasons) -> bool:
2 if not draft.answer.strip():
3 reasons.append("answer is empty"); return False
4 if not draft.citations:
5 reasons.append("answer has no citations (must be grounded in sources)"); return False
6 valid_ids = {d.kb_id for d in retrieved}
7 fabricated = [c.kb_id for c in draft.citations if c.kb_id not in valid_ids]
8 if fabricated:
9 reasons.append(f"citations reference unknown sources: {fabricated}"); return False
10 return True

Policy checksuggested_action must be on the allow-list and must not describe an irreversible operation:

workers/agent/policy.py
python
1# workers/agent/policy.py
2
3ALLOWED_ACTIONS = frozenset({
4 "", "none", "ask_clarification", "share_kb_article",
5 "create_human_task", "escalate_to_human"
6})
7
8_FORBIDDEN = re.compile(
9 r"\b(refund|charg(e|ing)|cancel(l?ed|l?ing|lation)?|delet(e|ing)|"
10 r"reset\s+(the\s+)?password|issue\s+(a\s+)?credit|replace\s+the\s+order|"
11 r"ship\s+a\s+replacement|process\s+(a\s+)?return|close\s+the\s+account)\b",
12 re.IGNORECASE,
13)

The model is instructed to set suggested_action = "create_human_task" for irreversible operations. But the guard doesn't trust that instruction — it checks the string itself with a regex. Code enforces the safety invariant; the prompt is a hint.


Node 5 — Decision: Single Routing Authority

workers/agent/graph/nodes/decision.py
python
1# workers/agent/graph/nodes/decision.py
2
3def decision(state: GraphState, deps: Deps) -> dict:
4 guard = state["guard"]
5 draft = state["draft"]
6 repair_count = state.get("repair_count", 0)
7 threshold = deps.cfg.confidence_threshold
8 max_repairs = deps.cfg.max_repair_retries
9
10 if guard.passed:
11 if state.get("retrieval_weak", False):
12 return out("escalate", "retrieval_weak")
13 if draft.confidence < threshold:
14 return out("escalate", "low_confidence")
15 return out("finalize")
16
17 # Guard failed.
18 if proposes_forbidden_action(draft.suggested_action):
19 return out("escalate", "forbidden_action") # terminal — never repair
20 if repair_count < max_repairs:
21 return out("repair")
22 return out("escalate", "repair_exhausted")

The routing logic in one diagram:

Routing Logic

One critical rule: a forbidden action is terminal. Even with repair budget remaining, a policy violation escalates immediately — repair must never produce a policy-violating reply that eventually gets sent.


Node 6 — Repair: Package Feedback, Loop Once

workers/agent/graph/nodes/repair.py
python
1# workers/agent/graph/nodes/repair.py
2
3def repair(state: GraphState, deps: Deps) -> dict:
4 reasons = state["guard"].reasons
5 return {
6 "repair_count": state.get("repair_count", 0) + 1,
7 "repair_feedback": "; ".join(reasons) if reasons else "improve grounding and clarity",
8 }

Three lines. All this node does is increment the counter and package the guard's rejection reasons as a string. The draft node reads repair_feedback from state and appends it to the next user prompt — the model gets told exactly what the deterministic guard rejected.


The LLM Layer: One Interface, Two Providers

Nodes depend on the ChatLLM protocol, not a concrete class. Switching from Ollama to OpenAI (or any OpenAI-compatible endpoint) is an env change, no code change.

workers/agent/llm/client.py
python
1# workers/agent/llm/client.py
2
3@dataclass(frozen=True)
4class LLMResult:
5 text: str
6 model: str
7 tokens: int
8 cost_cents: float
9
10class ChatLLM(Protocol):
11 def complete(
12 self, model: str, system: str, prompt: str,
13 *, json_mode: bool = False, temperature: float = 0.0,
14 ) -> LLMResult: ...

Ollama (local, $0 default):

python
python
1class OllamaChat:
2 def complete(self, model, system, prompt, *, json_mode=False, temperature=0.0) -> LLMResult:
3 body = {
4 "model": model,
5 "messages": [
6 {"role": "system", "content": system},
7 {"role": "user", "content": prompt},
8 ],
9 "stream": False,
10 "options": {"temperature": temperature, "num_predict": self.num_predict},
11 }
12 if json_mode:
13 body["format"] = "json" # Ollama constrains output to valid JSON
14 resp = self._client.post(f"{self.host}/api/chat", json=body)
15 resp.raise_for_status()
16 data = resp.json()
17 tokens = int(data.get("prompt_eval_count", 0)) + int(data.get("eval_count", 0))
18 return LLMResult(text=data["message"]["content"], model=model,
19 tokens=tokens, cost_cents=0.0)

num_predict caps generated tokens. Without it, a slow CPU-only host can stall for minutes on a single draft and trigger an HTTP timeout, crash-looping the worker.

OpenAI (or any compatible endpoint via OPENAI_BASE_URL):

python
python
1class OpenAIChat:
2 def complete(self, model, system, prompt, *, json_mode=False, temperature=0.0) -> LLMResult:
3 body = {"model": model, "messages": [...], "temperature": temperature}
4 if json_mode:
5 body["response_format"] = {"type": "json_object"}
6 resp = self._client.post("/chat/completions", json=body)
7 resp.raise_for_status()
8 data = resp.json()
9 tokens = int(data.get("usage", {}).get("total_tokens", 0))
10 return LLMResult(text=data["choices"][0]["message"]["content"],
11 model=model, tokens=tokens, cost_cents=...)

chat_from_env picks the provider at startup:

python
python
1def chat_from_env(cfg) -> ChatLLM:
2 if cfg.llm_provider == "ollama":
3 return OllamaChat(host=cfg.ollama_host, num_predict=cfg.draft_num_predict)
4 if cfg.llm_provider == "openai":
5 return OpenAIChat(api_key=cfg.openai_api_key, base_url=cfg.openai_base_url)
6 raise RuntimeError(f"unsupported chat provider: {cfg.llm_provider!r}")

The Redis Consumer: Idempotent, Retryable, Dead-Lettered

workers/agent/bus.py
python
1# workers/agent/bus.py
2
3class Bus:
4 def consume(self, handler) -> None:
5 self.ensure_group()
6 while not self._stop:
7 self._reclaim(handler) # re-deliver any idle (crashed) entries first
8 resp = self._rdb.xreadgroup(
9 self._cfg.consumer_group, self._cfg.consumer_name,
10 {self._cfg.stream_messages: ">"}, count=16, block=self._cfg.block_ms,
11 )
12 for _stream, entries in resp or []:
13 for entry_id, fields in entries:
14 self._dispatch(entry_id, fields, handler)

">" means "give me entries not yet delivered to this group" — new messages only. _reclaim calls XAUTOCLAIM to pick up entries that were delivered but never acknowledged (the worker crashed mid-processing).

Delivery budget — each entry gets max_deliveries attempts before it's dead-lettered:

python
python
1def _dispatch(self, entry_id, fields, handler) -> None:
2 attempts = self._rdb.hincrby(self._attempts_key, entry_id, 1)
3 if attempts > self._cfg.max_deliveries:
4 self._dead_letter(entry_id, fields, "max deliveries exceeded")
5 self._ack(entry_id, event_id)
6 return
7 try:
8 payload = json.loads(_field(fields, "payload"))
9 handler(payload)
10 self._ack(entry_id, event_id)
11 except Exception:
12 # Leave unacked → XAUTOCLAIM will retry on the next _reclaim pass.
13 log.exception("handler failed (attempt %d)", attempts)

Dead-lettered entries are XACK'd so the group makes progress — a poison message can't block the queue forever.


The Entrypoint: Wiring + Idempotent Handler

main.py wires everything and defines the per-message handler:

workers/agent/main.py
python
1# workers/agent/main.py
2
3def _make_handler(deps, graph, conn, bus, rdb, tracer):
4 def handle(payload: dict) -> None:
5 event_id = payload.get("event_id", "")
6
7 # Idempotent: a redelivered event must not produce a second draft.
8 marker = f"processed:{event_id}"
9 if not rdb.set(marker, "1", nx=True, ex=_PROCESSED_TTL_S):
10 log.info("skipping already-processed event %s", event_id)
11 return
12
13 try:
14 msg = MessageCreated.model_validate(payload)
15 with message_span(tracer, "process_message", msg.trace_id) as span:
16 state: GraphState = {"message": msg, "trace_id": msg.trace_id, "repair_count": 0}
17 t0 = time.monotonic()
18 final = graph.invoke(state, {"recursion_limit": deps.cfg.max_graph_steps})
19 latency_ms = int((time.monotonic() - t0) * 1000)
20
21 draft_id = db.insert_draft(conn, db.draft_row_from_state(final, latency_ms))
22 bus.publish_draft(_outbound_payload(final, draft_id))
23
24 span.set_attribute("draft.decision", final["decision"])
25 span.set_attribute("draft.tokens", final.get("tokens_used", 0))
26 except Exception:
27 rdb.delete(marker) # release marker so the bus can retry
28 raise
29
30 return handle

rdb.set(marker, "1", nx=True, ex=TTL) is a Redis atomic set-if-not-exists. If it returns False, this event was already processed — skip it without re-running the graph. The nx=True is the critical flag: it makes the check-and-set atomic. No race condition between two concurrent workers picking up the same redelivered entry.

If the handler raises, rdb.delete(marker) releases the marker so the bus can retry. The draft was never inserted, so a retry starts clean.


Tests: Zero Network, Zero DB

All node tests use FakeChat (returns canned JSON) and FakeRetriever (returns fixed docs). No mocking framework needed.

workers/agent/tests/fakes.py
python
1# workers/agent/tests/fakes.py
2
3class FakeChat:
4 """Pops draft responses in order so tests can stage 'invalid then valid'."""
5 def complete(self, model, system, prompt, *, json_mode=False, temperature=0.0):
6 if "triage classifier" in system:
7 return LLMResult(self.triage, model, 10, 0.0)
8 text = self.drafts.pop(0) if self.drafts else draft_json()
9 return LLMResult(text, model, 20, 0.0)
10
11class FakeRetriever:
12 def retrieve(self, query, intent, category=""):
13 return list(self.docs)
workers/agent/tests/test_nodes.py
python
1# workers/agent/tests/test_nodes.py
2
3class TriageNode(unittest.TestCase):
4 def test_invalid_output_escalates_via_zero_confidence(self):
5 # Why: garbled classifier response → zero confidence → decision escalates.
6 out = triage({"message": make_message()},
7 deps_with(chat=FakeChat(triage="not json")))
8 self.assertEqual(out["triage"].intent, "unknown")
9 self.assertEqual(out["triage"].confidence, 0.0)
10
11class GuardNode(unittest.TestCase):
12 def test_fabricated_citation_fails_grounding(self):
13 # Why: kb-99 was never retrieved — it's a hallucinated source.
14 r = self._guard(DraftResult(
15 answer="x",
16 citations=[{"kb_id": "kb-99", "title": "?", "snippet": "x"}],
17 confidence=0.8, suggested_action="share_kb_article"))
18 self.assertFalse(r.grounded)
19 self.assertFalse(r.passed)
20
21 def test_forbidden_action_fails_policy(self):
22 r = self._guard(DraftResult(
23 answer="Done.",
24 citations=[{"kb_id": "kb-1", "title": "t", "snippet": "x"}],
25 confidence=0.9, suggested_action="refund the customer"))
26 self.assertFalse(r.policy_ok)
27
28class DecisionNode(unittest.TestCase):
29 def test_forbidden_action_is_terminal_no_repair(self):
30 # Why: policy violation must never be repaired into a sent reply.
31 out = decision(
32 _state(passed=False, action="refund now", repair_count=0),
33 deps_with())
34 self.assertEqual(out["decision"], "escalate")
35 self.assertEqual(out["escalation_reason"], "forbidden_action")
36
37 def test_guard_fail_escalates_when_exhausted(self):
38 out = decision(_state(passed=False, repair_count=1), deps_with())
39 self.assertEqual(out["escalation_reason"], "repair_exhausted")
40
41class DraftNode(unittest.TestCase):
42 def test_invalid_then_valid_uses_retry(self):
43 # Why: PRD §12 — one retry on invalid structured output.
44 out = self._draft(["not json", draft_json(confidence=0.7)])
45 self.assertEqual(out["draft"].confidence, 0.7)
46 self.assertEqual(out["tokens_used"], 40) # both attempts counted
47
48 def test_invalid_twice_gives_up_at_zero_confidence(self):
49 out = self._draft(["bad", "still bad"])
50 self.assertEqual(out["draft"].confidence, 0.0)

Run the Tests

bash
bash
1cd resolver_code/workers/agent
2
3# All worker tests
4python -m pytest tests/ -v
5
6# Just node unit tests
7python -m pytest tests/test_nodes.py -v
8
9# Just RAG tests
10python -m pytest tests/test_rag.py -v

Complete Message Flow Through the Worker

Complete Message Flow Worker

Blog series · 6 parts

Let's Build a Customer Support Co-Pilot

an Event-Driven AI Agent with LangGraph, Go, pgvector & Redis Streams

View on GitHub
GoPythonpgvectorRedisNext.jsDockerLangGraph

Ready to Build Something Extraordinary?

Let's discuss your idea. We'll show you how AI-powered development can compress your timeline and budget — without cutting corners.

We respond within 24 hours. No sales pitch — just a straight conversation about your project.

More from the Blog

Explore more engineering insights, case studies, and technical deep-dives.

View all posts →
Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 6]
AITutorial

Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 6]

DieCutGo Studio turns any uploaded artwork into a print-ready die-cut sticker — background removal, contour tracing, print-readiness checks, mockups, and a shareable storefront, all backed by a Go pipeline fast enough to feel instant. Over this series I'll walk through how the whole thing is built, starting today with the least glamorous but most consequential decision: how the repo itself is laid out.

Karan KashyapJul 4, 2026
Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 5]
AITutorial

Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 5]

DieCutGo Studio turns any uploaded artwork into a print-ready die-cut sticker — background removal, contour tracing, print-readiness checks, mockups, and a shareable storefront, all backed by a Go pipeline fast enough to feel instant. Over this series I'll walk through how the whole thing is built, starting today with the least glamorous but most consequential decision: how the repo itself is laid out.

Karan KashyapJul 3, 2026
Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 4]
AITutorial

Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 4]

DieCutGo Studio turns any uploaded artwork into a print-ready die-cut sticker — background removal, contour tracing, print-readiness checks, mockups, and a shareable storefront, all backed by a Go pipeline fast enough to feel instant. Over this series I'll walk through how the whole thing is built, starting today with the least glamorous but most consequential decision: how the repo itself is laid out.

Karan KashyapJul 3, 2026
Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 3]
AITutorial

Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 3]

DieCutGo Studio turns any uploaded artwork into a print-ready die-cut sticker — background removal, contour tracing, print-readiness checks, mockups, and a shareable storefront, all backed by a Go pipeline fast enough to feel instant. Over this series I'll walk through how the whole thing is built, starting today with the least glamorous but most consequential decision: how the repo itself is laid out.

Karan KashyapJul 2, 2026