Let's Build a Customer Support AI Copilot: An Event-Driven Agent with LangGraph, Go, pgvector & Redis Streams [Part 3]

Karan Kashyap
June 26, 2026
Part 3: The LangGraph Agent — Triage, RAG Retrieval & Grounded Drafting
Event-driven AI pipeline: idempotent consumer groups, a six-node LangGraph state machine, hybrid RAG retrieval, deterministic guard rails, and confidence-based routing — all in Python.
Package Layout
1workers/agent/2├── main.py # entrypoint: wire → consume → handle → repeat3├── bus.py # Redis Streams consumer group (XREADGROUP + XAUTOCLAIM)4├── config.py # env config, validated at startup5├── db.py # psycopg connection + draft insert6├── schemas.py # Pydantic types: wire events + node outputs + GraphState7├── policy.py # forbidden-action allow-list (deterministic, not the model)8├── obs.py # OTel tracing9├── graph/10│ ├── build.py # assemble + compile the StateGraph11│ ├── deps.py # Deps dataclass: wires LLM, retriever, config into nodes12│ └── nodes/13│ ├── triage.py # classify intent / category / sentiment / urgency14│ ├── retrieve.py # hybrid KB search, weak-retrieval flag15│ ├── draft.py # grounded reply generation (structured output + retry)16│ ├── guard.py # deterministic groundedness + policy + tone checks17│ ├── decision.py # single routing authority: finalize | repair | escalate18│ └── repair.py # increment counter, package guard feedback19├── rag/20│ ├── retriever.py # Retriever: vector + keyword → RRF → lexical rerank21│ ├── fusion.py # Reciprocal Rank Fusion22│ └── rerank.py # query-term coverage blended rerank23├── llm/24│ ├── client.py # ChatLLM + Embeddings protocols, Ollama + OpenAI impls25│ └── prompts.py # versioned system + user prompt templates26└── tests/27 ├── fakes.py # FakeChat, FakeRetriever, builders — zero network/DB28 └── test_nodes.py # unit tests per node
The Shared State
Every node in the graph reads from and writes to a single GraphState TypedDict. LangGraph threads this state through the graph — each node returns a partial update, absent keys mean "not computed yet."
1# workers/agent/schemas.py23class GraphState(TypedDict, total=False):4 message: MessageCreated # the inbound event5 trace_id: str # OTel trace id from the API (continues the trace)67 triage: TriageResult # populated by: classify8 retrieved: list[RetrievedDoc]9 retrieval_weak: bool # True = best hit too distant → escalate, don't guess1011 draft: DraftResult # populated by: compose12 guard: GuardReport # populated by: review1314 repair_count: int # how many repair passes so far15 repair_feedback: str # guard reasons → fed back to the draft prompt1617 decision: Literal["finalize", "repair", "escalate"]18 escalation_reason: Optional[EscalationReason]1920 # Accounting for the drafts row.21 model: str22 tokens_used: int23 cost_cents: float
total=False means every key is optional. A node that doesn't need retrieved simply doesn't read it — no None checks scattered across nodes.
The structured output types each node produces are separately Pydantic-validated:
1class TriageResult(BaseModel):2 intent: str = Field(min_length=1)3 category: str = Field(min_length=1)4 sentiment: Sentiment # Literal["POSITIVE", "NEUTRAL", "NEGATIVE"]5 urgency: Urgency # Literal["LOW", "NORMAL", "HIGH"]6 confidence: float = Field(ge=0.0, le=1.0)78class DraftResult(BaseModel):9 model_config = {"extra": "forbid"}10 answer: str = Field(min_length=1)11 citations: list[Citation] = Field(default_factory=list)12 suggested_action: Optional[str] = None13 confidence: float = Field(ge=0.0, le=1.0)1415class GuardReport(BaseModel):16 grounded: bool17 tone_ok: bool18 policy_ok: bool19 passed: bool # AND of the three checks20 reasons: list[str] = Field(default_factory=list)
extra="forbid" on DraftResult means an LLM response with unexpected fields fails validation — it can't silently inject extra keys.
Assembling the Graph
1# workers/agent/graph/build.py23def build_graph(deps: Deps):4 g = StateGraph(GraphState)56 g.add_node("classify", partial(triage, deps=deps))7 g.add_node("retrieve", partial(retrieve, deps=deps))8 g.add_node("compose", partial(draft, deps=deps))9 g.add_node("review", partial(guard, deps=deps))10 g.add_node("route", partial(decision, deps=deps))11 g.add_node("repair", partial(repair, deps=deps))1213 g.add_edge(START, "classify")14 g.add_edge("classify", "retrieve")15 g.add_edge("retrieve", "compose")16 g.add_edge("compose", "review")17 g.add_edge("review", "route")18 g.add_conditional_edges(19 "route", lambda state: state["decision"],20 {"finalize": END, "escalate": END, "repair": "repair"},21 )22 g.add_edge("repair", "compose") # corrective pass goes back to draft2324 return g.compile()
functools.partial binds deps (LLM, retriever, config) into each node at compile time. The nodes themselves are plain functions — no class state, easy to test in isolation.
The repair loop is repair → compose → review → route. It's bounded: the decision node checks repair_count against cfg.max_repair_retries and escalates if exhausted.
The Six Nodes
Node 1 — Triage: Classify with a Small Model
1# workers/agent/graph/nodes/triage.py23_FALLBACK = TriageResult(4 intent="unknown", category="UNKNOWN",5 sentiment="NEUTRAL", urgency="NORMAL", confidence=0.06)78def triage(state: GraphState, deps: Deps) -> dict:9 body = state["message"].body10 res = deps.chat.complete(11 deps.cfg.triage_model,12 TRIAGE_SYSTEM,13 triage_user_prompt(body, deps.intents, deps.categories),14 json_mode=True,15 )16 try:17 result = TriageResult.model_validate(json.loads(res.text))18 except (json.JSONDecodeError, ValidationError):19 result = _FALLBACK # zero confidence → decision node escalates2021 return {22 "triage": result,23 "tokens_used": state.get("tokens_used", 0) + res.tokens,24 "cost_cents": state.get("cost_cents", 0.0) + res.cost_cents,25 }
Two things worth noting. First, triage uses the small/cheap model (cfg.triage_model) — classification doesn't need a 70B model. Second, on malformed output the fallback sets confidence=0.0, which guarantees the decision node escalates — the system never guesses an intent.
The system prompt is terse and deterministic:
1# workers/agent/llm/prompts.py23TRIAGE_SYSTEM = (4 "You are a customer-support triage classifier. Read the customer message and "5 "return ONLY a JSON object with keys: intent, category, sentiment, urgency, "6 "confidence. Choose intent and category from the provided taxonomy (closest "7 "match). sentiment is one of POSITIVE, NEUTRAL, NEGATIVE. urgency is one of "8 "LOW, NORMAL, HIGH. confidence is your certainty in [0,1]. No prose."9)1011def triage_user_prompt(body: str, intents: list[str], categories: list[str]) -> str:12 return (13 f"Known intents: {', '.join(sorted(intents)) or '(none)'}\n"14 f"Known categories: {', '.join(sorted(categories)) or '(none)'}\n\n"15 f"Customer message:\n{body}"16 )
The taxonomy (intents, categories) is loaded from the KB at startup — the model can only pick from real labels, which keeps routing accuracy measurable against the eval golden set.
Node 2 — Retrieve: Hybrid KB Search
1# workers/agent/graph/nodes/retrieve.py23WEAK_DISTANCE = 0.6 # cosine distance; 1 - cosine_similarity45def retrieve(state: GraphState, deps: Deps) -> dict:6 triage = state["triage"]7 docs = deps.retriever.retrieve(8 query=state["message"].body,9 intent=triage.intent,10 category=triage.category,11 )12 weak = (not docs) or (min(d.distance for d in docs) > WEAK_DISTANCE)13 return {"retrieved": docs, "retrieval_weak": weak}
retrieval_weak is the signal the decision node uses to escalate rather than let the model draft from thin context. Distance > 0.6 means even the closest chunk is more than 60% of the way across the cosine space — not close enough to ground a confident answer.
The Retriever runs two queries and fuses them:
1# workers/agent/rag/retriever.py23class Retriever:4 def retrieve(self, query: str, intent: str, category: str = "") -> list[RetrievedDoc]:5 qvec = self._embeddings.embed([query])[0]6 candidates = self._hybrid(query, qvec, intent)7 if not candidates:8 candidates = self._hybrid(query, qvec, "") # global fallback9 return lexical_rerank(query, candidates, self._top_k)1011 def _hybrid(self, query: str, qvec: list[float], intent: str) -> list[RetrievedDoc]:12 vec = self._vector_search(qvec, intent)13 kw = self._keyword_search(query, qvec, intent)14 # Vector distance wins on overlap (more authoritative than ts_rank).15 by_id: dict[str, RetrievedDoc] = {d.kb_id: d for d in kw}16 by_id.update({d.kb_id: d for d in vec})17 fused = reciprocal_rank_fusion([[d.kb_id for d in vec], [d.kb_id for d in kw]])18 return [by_id[doc_id] for doc_id, _ in fused if doc_id in by_id]
Vector half — pgvector cosine ANN over the HNSW index, filtered by intent:
1_VECTOR_SQL = (2 "SELECT id::text, intent, category, title, content, embedding <=> %s::vector AS dist "3 "FROM kb_documents "4 "WHERE (%s = '' OR intent = %s) "5 "ORDER BY dist ASC LIMIT %s"6)
Keyword half — Postgres FTS over the GIN index (same expression as migration 2, so the index is hit):
1_FTS = "to_tsvector('english', title || ' ' || content)"2_KEYWORD_SQL = (3 f"SELECT id::text, intent, category, title, content, embedding <=> %s::vector AS dist "4 f"FROM kb_documents "5 f"WHERE (%s = '' OR intent = %s) AND {_FTS} @@ plainto_tsquery('english', %s) "6 f"ORDER BY ts_rank({_FTS}, plainto_tsquery('english', %s)) DESC LIMIT %s"7)
Note that the keyword query also returns the cosine distance (embedding <=> %s::vector) — not for ranking, but so every returned doc carries a real distance for the retrieval_weak signal.
Reciprocal Rank Fusion
RRF merges the two ranked lists without needing to normalize scores across different scales (cosine distance vs. ts_rank). A document that surfaces in both halves gets a higher fused score.
1# workers/agent/rag/fusion.py23RRF_K = 60 # standard damping constant45def reciprocal_rank_fusion(6 ranked_lists: list[list[str]], k: int = RRF_K7) -> list[tuple[str, float]]:8 """Return (id, score) pairs ordered by fused score descending."""9 scores: dict[str, float] = {}10 order: dict[str, int] = {}11 seen = 012 for ranked in ranked_lists:13 for rank, doc_id in enumerate(ranked):14 scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank + 1)15 if doc_id not in order:16 order[doc_id] = seen17 seen += 118 return sorted(scores.items(), key=lambda kv: (-kv[1], order[kv[0]]))
After fusion, a lightweight lexical re-ranker blends the fused order with query-term coverage — no cross-encoder model needed, fully offline:
1# workers/agent/rag/rerank.py23def lexical_rerank(4 query: str, docs: list[RetrievedDoc], top_k: int, weight: float = 0.55) -> list[RetrievedDoc]:6 query_terms = _tokens(query)7 n = len(docs)8 scored = []9 for idx, doc in enumerate(docs):10 fused = 1.0 - idx / n # fused rank → 0..111 lexical = _coverage(query_terms, doc) # term overlap ratio12 blended = weight * lexical + (1.0 - weight) * fused13 scored.append((blended, idx, doc))14 scored.sort(key=lambda s: (-s[0], s[1]))15 return [doc for _, _, doc in scored[:top_k]]
weight=0.5 means it's a 50/50 blend — neither pure vector nor pure keyword dominates.
Node 3 — Draft: Grounded Reply with Structured Output
1# workers/agent/graph/nodes/draft.py23_GIVE_UP = DraftResult(answer="(unable to produce a grounded answer)", citations=[], confidence=0.0)45def draft(state: GraphState, deps: Deps) -> dict:6 prompt = draft_user_prompt(7 state["message"].body,8 state.get("retrieved", []),9 state.get("repair_feedback", ""), # non-empty on repair pass10 tool_data,11 )1213 res = _generate(deps, prompt)14 result = _parse(res)1516 if result is None: # one retry on invalid structured output17 retry = _generate(deps, prompt)18 result = _parse(retry) or _GIVE_UP1920 return {21 "draft": result,22 "model": deps.cfg.draft_model,23 "tokens_used": state.get("tokens_used", 0) + tokens,24 "cost_cents": state.get("cost_cents", 0.0) + cost,25 }
_GIVE_UP has confidence=0.0 — if two attempts at structured output both fail, the decision node escalates. The pipeline never sends a response the system can't parse.
The draft system prompt enforces grounding at the instruction level:
1DRAFT_SYSTEM = (2 "You are a customer-support assistant. Write a reply GROUNDED ONLY in the "3 "provided sources — never use outside knowledge. Cite the sources you used. "4 "You may SUGGEST an action but must NEVER perform an irreversible one (refund, "5 "cancellation, charge, account change); for those, set suggested_action to "6 "'create_human_task'. If the sources do not cover the question, say so plainly "7 "and set confidence low. Return ONLY a JSON object with keys: answer (string), "8 "citations (array of {kb_id, title, snippet}), suggested_action (string or null), "9 "confidence (number in [0,1]). No prose outside the JSON."10)
On a repair pass the guard's rejection reasons are injected at the end of the user prompt:
1def draft_user_prompt(body, docs, feedback="", tool_data="") -> str:2 sources = "\n\n".join(3 f"[{i+1}] kb_id={d.kb_id} title={d.title}\n{d.content}"4 for i, d in enumerate(docs)5 ) if docs else "(no sources retrieved)"67 parts = [f"Sources:\n{sources}"]8 if tool_data:9 parts.append(f"\nKnown data (from tools):\n{tool_data}")10 parts.append(f"\nCustomer message:\n{body}")11 if feedback:12 parts.append(f"\nThe previous draft was rejected. Fix these issues:\n{feedback}")13 return "\n".join(parts)
Node 4 — Guard: Deterministic Policy Checks
The guard runs in code, not the model. The verdict is reproducible, testable, and unaffected by LLM temperature.
1# workers/agent/graph/nodes/guard.py23def guard(state: GraphState, deps: Deps) -> dict:4 draft = state["draft"]5 retrieved = state.get("retrieved", [])6 reasons: list[str] = []78 grounded = _check_grounded(draft, retrieved, reasons)9 policy_ok = _check_policy(draft, reasons)10 tone_ok = _check_tone(draft, reasons)1112 report = GuardReport(13 grounded=grounded, tone_ok=tone_ok, policy_ok=policy_ok,14 passed=grounded and tone_ok and policy_ok,15 reasons=reasons,16 )17 return {"guard": report}
Groundedness check — every kb_id in citations must be a real retrieved source. A fabricated citation (a kb_id the model invented) is a hallucination and fails immediately:
1def _check_grounded(draft, retrieved, reasons) -> bool:2 if not draft.answer.strip():3 reasons.append("answer is empty"); return False4 if not draft.citations:5 reasons.append("answer has no citations (must be grounded in sources)"); return False6 valid_ids = {d.kb_id for d in retrieved}7 fabricated = [c.kb_id for c in draft.citations if c.kb_id not in valid_ids]8 if fabricated:9 reasons.append(f"citations reference unknown sources: {fabricated}"); return False10 return True
Policy check — suggested_action must be on the allow-list and must not describe an irreversible operation:
1# workers/agent/policy.py23ALLOWED_ACTIONS = frozenset({4 "", "none", "ask_clarification", "share_kb_article",5 "create_human_task", "escalate_to_human"6})78_FORBIDDEN = re.compile(9 r"\b(refund|charg(e|ing)|cancel(l?ed|l?ing|lation)?|delet(e|ing)|"10 r"reset\s+(the\s+)?password|issue\s+(a\s+)?credit|replace\s+the\s+order|"11 r"ship\s+a\s+replacement|process\s+(a\s+)?return|close\s+the\s+account)\b",12 re.IGNORECASE,13)
The model is instructed to set suggested_action = "create_human_task" for irreversible operations. But the guard doesn't trust that instruction — it checks the string itself with a regex. Code enforces the safety invariant; the prompt is a hint.
Node 5 — Decision: Single Routing Authority
1# workers/agent/graph/nodes/decision.py23def decision(state: GraphState, deps: Deps) -> dict:4 guard = state["guard"]5 draft = state["draft"]6 repair_count = state.get("repair_count", 0)7 threshold = deps.cfg.confidence_threshold8 max_repairs = deps.cfg.max_repair_retries910 if guard.passed:11 if state.get("retrieval_weak", False):12 return out("escalate", "retrieval_weak")13 if draft.confidence < threshold:14 return out("escalate", "low_confidence")15 return out("finalize")1617 # Guard failed.18 if proposes_forbidden_action(draft.suggested_action):19 return out("escalate", "forbidden_action") # terminal — never repair20 if repair_count < max_repairs:21 return out("repair")22 return out("escalate", "repair_exhausted")
The routing logic in one diagram:
One critical rule: a forbidden action is terminal. Even with repair budget remaining, a policy violation escalates immediately — repair must never produce a policy-violating reply that eventually gets sent.
Node 6 — Repair: Package Feedback, Loop Once
1# workers/agent/graph/nodes/repair.py23def repair(state: GraphState, deps: Deps) -> dict:4 reasons = state["guard"].reasons5 return {6 "repair_count": state.get("repair_count", 0) + 1,7 "repair_feedback": "; ".join(reasons) if reasons else "improve grounding and clarity",8 }
Three lines. All this node does is increment the counter and package the guard's rejection reasons as a string. The draft node reads repair_feedback from state and appends it to the next user prompt — the model gets told exactly what the deterministic guard rejected.
The LLM Layer: One Interface, Two Providers
Nodes depend on the ChatLLM protocol, not a concrete class. Switching from Ollama to OpenAI (or any OpenAI-compatible endpoint) is an env change, no code change.
1# workers/agent/llm/client.py23@dataclass(frozen=True)4class LLMResult:5 text: str6 model: str7 tokens: int8 cost_cents: float910class ChatLLM(Protocol):11 def complete(12 self, model: str, system: str, prompt: str,13 *, json_mode: bool = False, temperature: float = 0.0,14 ) -> LLMResult: ...
Ollama (local, $0 default):
1class OllamaChat:2 def complete(self, model, system, prompt, *, json_mode=False, temperature=0.0) -> LLMResult:3 body = {4 "model": model,5 "messages": [6 {"role": "system", "content": system},7 {"role": "user", "content": prompt},8 ],9 "stream": False,10 "options": {"temperature": temperature, "num_predict": self.num_predict},11 }12 if json_mode:13 body["format"] = "json" # Ollama constrains output to valid JSON14 resp = self._client.post(f"{self.host}/api/chat", json=body)15 resp.raise_for_status()16 data = resp.json()17 tokens = int(data.get("prompt_eval_count", 0)) + int(data.get("eval_count", 0))18 return LLMResult(text=data["message"]["content"], model=model,19 tokens=tokens, cost_cents=0.0)
num_predict caps generated tokens. Without it, a slow CPU-only host can stall for minutes on a single draft and trigger an HTTP timeout, crash-looping the worker.
OpenAI (or any compatible endpoint via OPENAI_BASE_URL):
1class OpenAIChat:2 def complete(self, model, system, prompt, *, json_mode=False, temperature=0.0) -> LLMResult:3 body = {"model": model, "messages": [...], "temperature": temperature}4 if json_mode:5 body["response_format"] = {"type": "json_object"}6 resp = self._client.post("/chat/completions", json=body)7 resp.raise_for_status()8 data = resp.json()9 tokens = int(data.get("usage", {}).get("total_tokens", 0))10 return LLMResult(text=data["choices"][0]["message"]["content"],11 model=model, tokens=tokens, cost_cents=...)
chat_from_env picks the provider at startup:
1def chat_from_env(cfg) -> ChatLLM:2 if cfg.llm_provider == "ollama":3 return OllamaChat(host=cfg.ollama_host, num_predict=cfg.draft_num_predict)4 if cfg.llm_provider == "openai":5 return OpenAIChat(api_key=cfg.openai_api_key, base_url=cfg.openai_base_url)6 raise RuntimeError(f"unsupported chat provider: {cfg.llm_provider!r}")
The Redis Consumer: Idempotent, Retryable, Dead-Lettered
1# workers/agent/bus.py23class Bus:4 def consume(self, handler) -> None:5 self.ensure_group()6 while not self._stop:7 self._reclaim(handler) # re-deliver any idle (crashed) entries first8 resp = self._rdb.xreadgroup(9 self._cfg.consumer_group, self._cfg.consumer_name,10 {self._cfg.stream_messages: ">"}, count=16, block=self._cfg.block_ms,11 )12 for _stream, entries in resp or []:13 for entry_id, fields in entries:14 self._dispatch(entry_id, fields, handler)
">" means "give me entries not yet delivered to this group" — new messages only. _reclaim calls XAUTOCLAIM to pick up entries that were delivered but never acknowledged (the worker crashed mid-processing).
Delivery budget — each entry gets max_deliveries attempts before it's dead-lettered:
1def _dispatch(self, entry_id, fields, handler) -> None:2 attempts = self._rdb.hincrby(self._attempts_key, entry_id, 1)3 if attempts > self._cfg.max_deliveries:4 self._dead_letter(entry_id, fields, "max deliveries exceeded")5 self._ack(entry_id, event_id)6 return7 try:8 payload = json.loads(_field(fields, "payload"))9 handler(payload)10 self._ack(entry_id, event_id)11 except Exception:12 # Leave unacked → XAUTOCLAIM will retry on the next _reclaim pass.13 log.exception("handler failed (attempt %d)", attempts)
Dead-lettered entries are XACK'd so the group makes progress — a poison message can't block the queue forever.
The Entrypoint: Wiring + Idempotent Handler
main.py wires everything and defines the per-message handler:
1# workers/agent/main.py23def _make_handler(deps, graph, conn, bus, rdb, tracer):4 def handle(payload: dict) -> None:5 event_id = payload.get("event_id", "")67 # Idempotent: a redelivered event must not produce a second draft.8 marker = f"processed:{event_id}"9 if not rdb.set(marker, "1", nx=True, ex=_PROCESSED_TTL_S):10 log.info("skipping already-processed event %s", event_id)11 return1213 try:14 msg = MessageCreated.model_validate(payload)15 with message_span(tracer, "process_message", msg.trace_id) as span:16 state: GraphState = {"message": msg, "trace_id": msg.trace_id, "repair_count": 0}17 t0 = time.monotonic()18 final = graph.invoke(state, {"recursion_limit": deps.cfg.max_graph_steps})19 latency_ms = int((time.monotonic() - t0) * 1000)2021 draft_id = db.insert_draft(conn, db.draft_row_from_state(final, latency_ms))22 bus.publish_draft(_outbound_payload(final, draft_id))2324 span.set_attribute("draft.decision", final["decision"])25 span.set_attribute("draft.tokens", final.get("tokens_used", 0))26 except Exception:27 rdb.delete(marker) # release marker so the bus can retry28 raise2930 return handle
rdb.set(marker, "1", nx=True, ex=TTL) is a Redis atomic set-if-not-exists. If it returns False, this event was already processed — skip it without re-running the graph. The nx=True is the critical flag: it makes the check-and-set atomic. No race condition between two concurrent workers picking up the same redelivered entry.
If the handler raises, rdb.delete(marker) releases the marker so the bus can retry. The draft was never inserted, so a retry starts clean.
Tests: Zero Network, Zero DB
All node tests use FakeChat (returns canned JSON) and FakeRetriever (returns fixed docs). No mocking framework needed.
1# workers/agent/tests/fakes.py23class FakeChat:4 """Pops draft responses in order so tests can stage 'invalid then valid'."""5 def complete(self, model, system, prompt, *, json_mode=False, temperature=0.0):6 if "triage classifier" in system:7 return LLMResult(self.triage, model, 10, 0.0)8 text = self.drafts.pop(0) if self.drafts else draft_json()9 return LLMResult(text, model, 20, 0.0)1011class FakeRetriever:12 def retrieve(self, query, intent, category=""):13 return list(self.docs)
1# workers/agent/tests/test_nodes.py23class TriageNode(unittest.TestCase):4 def test_invalid_output_escalates_via_zero_confidence(self):5 # Why: garbled classifier response → zero confidence → decision escalates.6 out = triage({"message": make_message()},7 deps_with(chat=FakeChat(triage="not json")))8 self.assertEqual(out["triage"].intent, "unknown")9 self.assertEqual(out["triage"].confidence, 0.0)1011class GuardNode(unittest.TestCase):12 def test_fabricated_citation_fails_grounding(self):13 # Why: kb-99 was never retrieved — it's a hallucinated source.14 r = self._guard(DraftResult(15 answer="x",16 citations=[{"kb_id": "kb-99", "title": "?", "snippet": "x"}],17 confidence=0.8, suggested_action="share_kb_article"))18 self.assertFalse(r.grounded)19 self.assertFalse(r.passed)2021 def test_forbidden_action_fails_policy(self):22 r = self._guard(DraftResult(23 answer="Done.",24 citations=[{"kb_id": "kb-1", "title": "t", "snippet": "x"}],25 confidence=0.9, suggested_action="refund the customer"))26 self.assertFalse(r.policy_ok)2728class DecisionNode(unittest.TestCase):29 def test_forbidden_action_is_terminal_no_repair(self):30 # Why: policy violation must never be repaired into a sent reply.31 out = decision(32 _state(passed=False, action="refund now", repair_count=0),33 deps_with())34 self.assertEqual(out["decision"], "escalate")35 self.assertEqual(out["escalation_reason"], "forbidden_action")3637 def test_guard_fail_escalates_when_exhausted(self):38 out = decision(_state(passed=False, repair_count=1), deps_with())39 self.assertEqual(out["escalation_reason"], "repair_exhausted")4041class DraftNode(unittest.TestCase):42 def test_invalid_then_valid_uses_retry(self):43 # Why: PRD §12 — one retry on invalid structured output.44 out = self._draft(["not json", draft_json(confidence=0.7)])45 self.assertEqual(out["draft"].confidence, 0.7)46 self.assertEqual(out["tokens_used"], 40) # both attempts counted4748 def test_invalid_twice_gives_up_at_zero_confidence(self):49 out = self._draft(["bad", "still bad"])50 self.assertEqual(out["draft"].confidence, 0.0)
Run the Tests
1cd resolver_code/workers/agent23# All worker tests4python -m pytest tests/ -v56# Just node unit tests7python -m pytest tests/test_nodes.py -v89# Just RAG tests10python -m pytest tests/test_rag.py -v
![Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 6]](/_next/image/?url=https%3A%2F%2Fcdn.sanity.io%2Fimages%2F3e1sexdu%2Fproduction%2Feeb1314f51d4c39e5d1e176c2c837de8f33725ca-1600x739.png%3Frect%3D61%2C0%2C1478%2C739%26w%3D800%26h%3D400%26q%3D85%26fit%3Dcrop%26auto%3Dformat&w=3840&q=75)