← Back to all posts
AITutorial

Let's Build a Customer Support AI Copilot: An Event-Driven Agent with LangGraph, Go, pgvector & Redis Streams [Part 2]

Karan Kashyap

Karan Kashyap

June 23, 2026

Let's Build a Customer Support AI Copilot: An Event-Driven Agent with LangGraph, Go, pgvector & Redis Streams [Part 2]

Part 2: The Go GraphQL API — Resolvers, Pub/Sub Bridge & Subscription Fan-Out


Package Layout

text
text
1services/api/
2├── cmd/server/main.go # entrypoint: wires everything, starts HTTP
3├── internal/
4│ ├── config/config.go # env validation at startup
5│ ├── repo/
6│ │ ├── store.go # Store interface (contract for testability)
7│ │ ├── models.go # DB ↔ Go type mapping
8│ │ └── postgres.go # pgx pool implementation
9│ ├── service/service.go # business logic: validate + persist + publish
10│ ├── resolvers/
11│ │ ├── resolver.go # root resolver wiring
12│ │ ├── schema.resolvers.go # generated + implemented resolver methods
13│ │ ├── mapping.go # repo → gql type converters
14│ │ └── errors.go # boundary error handling
15│ ├── pubsub/
16│ │ ├── events.go # typed event structs + constructors
17│ │ ├── redis.go # XADD publish + XREAD subscription bridge
18│ │ └── hub.go # in-process subscription fan-out
19│ └── obs/tracing.go # OpenTelemetry setup
20└── gqlgen.yml # codegen config

Step 1 — Configuration: Fail Fast at Startup

Nothing is worse than a service that silently boots with bad config and fails on the first request. config.Load() reads the environment, collects all missing/invalid values, and returns them together.

services/api/internal/config/config.go
go
1// services/api/internal/config/config.go
2
3package config
4
5type Config struct {
6 DatabaseURL string
7 PGPoolMaxConns int32
8
9 RedisURL string
10 StreamMessages string // API publishes message.created here
11 StreamDrafts string // API consumes draft.* here for subscriptions
12
13 APIHost string
14 APIPort int
15 CORSAllowedOrigins []string
16}
17
18func Load() (*Config, error) {
19 var problems []string
20 require := func(key string) string {
21 v := strings.TrimSpace(os.Getenv(key))
22 if v == "" {
23 problems = append(problems, "missing required env "+key)
24 }
25 return v
26 }
27
28 c := &Config{
29 DatabaseURL: require("DATABASE_URL"),
30 RedisURL: require("REDIS_URL"),
31 StreamMessages: getDefault("STREAM_MESSAGES", "messages"),
32 StreamDrafts: getDefault("STREAM_DRAFTS", "drafts"),
33 APIHost: getDefault("API_HOST", "0.0.0.0"),
34 CORSAllowedOrigins: getList("CORS_ALLOWED_ORIGINS",
35 []string{"http://localhost:3000"}),
36 }
37 c.PGPoolMaxConns = int32(getInt("PG_POOL_MAX_CONNS", 20, &problems))
38 c.APIPort = getInt("API_PORT", 8080, &problems)
39
40 if len(problems) > 0 {
41 return nil, fmt.Errorf("invalid config:\n - %s",
42 strings.Join(problems, "\n - "))
43 }
44 return c, nil
45}

require() appends to problems instead of returning early — one boot attempt surfaces every misconfiguration at once.


Step 2 — The Repository Interface

The service layer depends on a Store interface, not on *Postgres directly. This means tests get a zero-dependency fake, not a real database.

services/api/internal/repo/store.go
go
1// services/api/internal/repo/store.go
2
3package repo
4
5type Store interface {
6 // Creates a CUSTOMER message; auto-creates the conversation if ConversationID is nil.
7 CreateMessage(ctx context.Context, in CreateMessageInput) (Message, error)
8
9 GetConversation(ctx context.Context, id string) (*Conversation, error)
10 GetMessage(ctx context.Context, id string) (*Message, error)
11 GetDraft(ctx context.Context, id string) (*Draft, error)
12
13 // Priority queue: urgency-first, with negative-sentiment tie-breaking.
14 // Returns keyset cursor for the next page.
15 ListDrafts(ctx context.Context, status string, limit int, cursor string) (
16 items []Draft, nextCursor string, err error)
17
18 UpdateDraftStatus(ctx context.Context, id, status string, edited *string) (*Draft, error)
19 AppendAudit(ctx context.Context, e AuditEntry) error
20 ListEvalRuns(ctx context.Context, limit int) ([]EvalRun, error)
21 DashboardStats(ctx context.Context) (DashboardStats, error)
22
23 Close()
24}

Step 3 — The pgx Implementation

NewPostgres opens a sized connection pool and pings before returning — startup fails loudly rather than on the first query.

services/api/internal/repo/postgres.go
go
1// services/api/internal/repo/postgres.go
2
3func NewPostgres(ctx context.Context, dsn string, maxConns int32) (*Postgres, error) {
4 cfg, err := pgxpool.ParseConfig(dsn)
5 if err != nil {
6 return nil, fmt.Errorf("parse dsn: %w", err)
7 }
8 cfg.MaxConns = maxConns
9 pool, err := pgxpool.NewWithConfig(ctx, cfg)
10 if err != nil {
11 return nil, fmt.Errorf("open pool: %w", err)
12 }
13 if err := pool.Ping(ctx); err != nil {
14 pool.Close()
15 return nil, fmt.Errorf("ping: %w", err)
16 }
17 return &Postgres{pool: pool}, nil
18}

CreateMessage runs inside a transaction — either both the conversation row (if new) and the message row land, or neither does.

go
go
1func (p *Postgres) CreateMessage(ctx context.Context, in CreateMessageInput) (Message, error) {
2 tx, err := p.pool.Begin(ctx)
3 if err != nil {
4 return Message{}, fmt.Errorf("begin: %w", err)
5 }
6 defer tx.Rollback(ctx)
7
8 convID := ""
9 if in.ConversationID != nil && *in.ConversationID != "" {
10 // Verify the conversation exists; fail clearly if not.
11 if err := tx.QueryRow(ctx,
12 `SELECT id FROM conversations WHERE id = $1`, *in.ConversationID,
13 ).Scan(&convID); err != nil {
14 if errors.Is(err, pgx.ErrNoRows) { return Message{}, ErrNotFound }
15 return Message{}, fmt.Errorf("lookup conversation: %w", err)
16 }
17 } else {
18 ref := "anonymous"
19 if in.CustomerRef != nil && *in.CustomerRef != "" { ref = *in.CustomerRef }
20 if err := tx.QueryRow(ctx,
21 `INSERT INTO conversations (customer_ref, status)
22 VALUES ($1, 'OPEN') RETURNING id`, ref,
23 ).Scan(&convID); err != nil {
24 return Message{}, fmt.Errorf("insert conversation: %w", err)
25 }
26 }
27
28 var m Message
29 m.ConversationID = convID
30 if err := tx.QueryRow(ctx,
31 `INSERT INTO messages (conversation_id, role, body)
32 VALUES ($1, 'CUSTOMER', $2) RETURNING id, created_at`,
33 convID, in.Body,
34 ).Scan(&m.ID, &m.CreatedAt); err != nil {
35 return Message{}, fmt.Errorf("insert message: %w", err)
36 }
37 if err := tx.Commit(ctx); err != nil {
38 return Message{}, fmt.Errorf("commit: %w", err)
39 }
40 return m, nil
41}

Priority Queue with Keyset Pagination

The queue surfaces high-urgency, negative-sentiment tickets first. The SQL ranking expression and its Go mirror stay in lockstep — same formula, two representations tested against each other.

services/api/internal/repo/postgres.go
go
1// services/api/internal/repo/postgres.go
2
3// draftRank is a SQL expression: urgency dominates, sentiment breaks ties.
4// HIGH always outranks NORMAL regardless of sentiment.
5const draftRank = `(CASE urgency WHEN 'HIGH' THEN 6 WHEN 'NORMAL' THEN 4 ELSE 2 END` +
6 ` + CASE sentiment WHEN 'NEGATIVE' THEN 1 ELSE 0 END)`
7
8// goRank mirrors draftRank for building the keyset cursor in Go.
9func goRank(d Draft) int {
10 rank := 2
11 switch d.Urgency {
12 case "HIGH": rank = 6
13 case "NORMAL": rank = 4
14 }
15 if d.Sentiment == "NEGATIVE" { rank++ }
16 return rank
17}

The cursor is a rank|timestamp composite string. On the next page request it becomes a WHERE (rank, created_at) < ($cursor_rank, $cursor_time) row-value comparison — no OFFSET, no count queries.


Step 4 — The Service Layer

service.Service is the only place business rules live. It validates input, calls the store, publishes events, and writes audit rows. Resolvers never touch the store directly.

services/api/internal/service/service.go
go
1// services/api/internal/service/service.go
2
3const maxBodyLen = 8000
4
5func (s *Service) IngestMessage(ctx context.Context, in repo.CreateMessageInput) (repo.Message, error) {
6 ctx, span := obs.Tracer().Start(ctx, "ingest_message")
7 defer span.End()
8
9 body := strings.TrimSpace(in.Body)
10 if body == "" {
11 return repo.Message{}, fmt.Errorf("%w: body is required", ErrInvalidInput)
12 }
13 if len(body) > maxBodyLen {
14 return repo.Message{}, fmt.Errorf("%w: body exceeds %d chars", ErrInvalidInput, maxBodyLen)
15 }
16 in.Body = body
17
18 // Persist before publish: the worker can always load the row it's told about.
19 m, err := s.store.CreateMessage(ctx, in)
20 if err != nil { return repo.Message{}, err }
21
22 span.SetAttributes(
23 attribute.String("conversation.id", m.ConversationID),
24 attribute.String("message.id", m.ID),
25 )
26
27 // Carry the OTel trace into the event so the worker continues the same trace.
28 tid := traceID(ctx)
29 if sc := span.SpanContext(); sc.HasTraceID() { tid = sc.TraceID().String() }
30
31 ev := pubsub.NewMessageCreated(m.ID, m.ConversationID, m.Body, tid)
32 if err := s.pub.PublishMessageCreated(ctx, ev); err != nil {
33 // Message persisted but event lost — surface it so the caller can reprocess.
34 return m, fmt.Errorf("publish message.created: %w", err)
35 }
36 return m, nil
37}

ApproveReply calls transition(), which reads the draft before and after, writes the audit row, and returns the updated draft. Audit failure is logged but never rolls back the human action — an audit write error must not undo a completed approval.

go
go
1func (s *Service) transition(ctx context.Context,
2 draftID, status, action string, edited *string,
3) (*repo.Draft, error) {
4 before, err := s.store.GetDraft(ctx, draftID)
5 if err != nil { return nil, err }
6
7 after, err := s.store.UpdateDraftStatus(ctx, draftID, status, edited)
8 if err != nil { return nil, err }
9
10 if auditErr := s.store.AppendAudit(ctx, repo.AuditEntry{
11 DraftID: draftID, Actor: s.actor, Action: action,
12 Before: map[string]any{"status": before.Status},
13 After: map[string]any{"status": after.Status},
14 }); auditErr != nil {
15 // Best-effort: log it, don't undo the transition.
16 s.log.WarnContext(ctx, "audit write failed",
17 "draft_id", draftID, "action", action, "err", auditErr)
18 }
19 return after, nil
20}

Step 5 — The gqlgen Resolvers

gqlgen generates the interface stubs. We implement them in schema.resolvers.go. Each resolver does exactly one thing: call the service, convert the result to a GQL type.

services/api/internal/resolvers/schema.resolvers.go
go
1// services/api/internal/resolvers/schema.resolvers.go
2
3func (r *mutationResolver) IngestMessage(
4 ctx context.Context, input gql.IngestInput,
5) (*gql.Message, error) {
6 m, err := r.svc.IngestMessage(ctx, repo.CreateMessageInput{
7 ConversationID: input.ConversationID,
8 CustomerRef: input.CustomerRef,
9 Body: input.Body,
10 })
11 if err != nil {
12 return nil, boundaryErr(ctx, r.log, "ingestMessage", err)
13 }
14 return toGQLMessage(&m), nil
15}
16
17func (r *mutationResolver) ApproveReply(
18 ctx context.Context, draftID string, edited *string,
19) (*gql.Draft, error) {
20 d, err := r.svc.ApproveReply(ctx, draftID, edited)
21 if err != nil {
22 return nil, boundaryErr(ctx, r.log, "approveReply", err)
23 }
24 return toGQLDraft(d), nil
25}

The DraftUpdates subscription resolver is more interesting — it subscribes to the Hub, then in a goroutine it pulls each event, loads the full draft from the store, and forwards it to the channel the GQL runtime is reading.

go
go
1func (r *subscriptionResolver) DraftUpdates(
2 ctx context.Context, conversationID *string,
3) (<-chan *gql.Draft, error) {
4 cid := ""
5 if conversationID != nil { cid = *conversationID }
6
7 events, unsub := r.hub.Subscribe(cid)
8 out := make(chan *gql.Draft, 16)
9
10 go func() {
11 defer unsub()
12 defer close(out)
13 for {
14 select {
15 case <-ctx.Done():
16 return
17 case ev, ok := <-events:
18 if !ok { return }
19 d, err := r.svc.GetDraft(ctx, ev.DraftID)
20 if err != nil {
21 r.log.WarnContext(ctx, "draftUpdates: load draft failed",
22 "draft_id", ev.DraftID, "err", err)
23 continue
24 }
25 select {
26 case out <- toGQLDraft(d):
27 case <-ctx.Done():
28 return
29 }
30 }
31 }
32 }()
33 return out, nil
34}

Loading the full draft on each subscription event means the UI always receives the same rich Draft shape as a query — no separate subscription-specific type needed.


Step 6 — The Redis Pub/Sub Bridge

The Bus connects the two streams.

Publish side — the API calls PublishMessageCreated after persisting a message:

services/api/internal/pubsub/redis.go
go
1// services/api/internal/pubsub/redis.go
2
3func (b *Bus) PublishMessageCreated(ctx context.Context, ev MessageCreated) error {
4 payload, err := json.Marshal(ev)
5 if err != nil { return fmt.Errorf("marshal event: %w", err) }
6
7 return b.rdb.XAdd(ctx, &redis.XAddArgs{
8 Stream: b.streamMessages,
9 Values: map[string]any{
10 "type": ev.Type,
11 "event_id": ev.EventID,
12 "payload": payload, // full JSON under "payload"
13 },
14 }).Err()
15}

Consume side — a background goroutine reads new entries from the drafts stream and pushes them to the Hub:

go
go
1func (b *Bus) ConsumeDrafts(ctx context.Context) error {
2 lastID := "$" // only new messages from the moment the API starts
3 for {
4 if ctx.Err() != nil { return ctx.Err() }
5
6 res, err := b.rdb.XRead(ctx, &redis.XReadArgs{
7 Streams: []string{b.streamDrafts, lastID},
8 Block: 5 * time.Second,
9 Count: 64,
10 }).Result()
11 if err != nil {
12 if errors.Is(err, redis.Nil) { continue } // block timeout
13 if ctx.Err() != nil { return ctx.Err() }
14 return fmt.Errorf("xread %s: %w", b.streamDrafts, err)
15 }
16 for _, stream := range res {
17 for _, msg := range stream.Messages {
18 lastID = msg.ID
19 if ev, ok := decodeDraftEvent(msg.Values); ok {
20 b.hub.Publish(ev)
21 }
22 }
23 }
24 }
25}

"$" as the initial ID means the API only picks up events published after it boots — it doesn't replay the full history on restart. The subscription is for live updates; historical state comes from queries.


Step 7 — The Subscription Hub

Hub is an in-process fan-out: one entry per open WebSocket subscription. It's intentionally simple — sync.RWMutex over a map, a buffered channel per subscriber.

services/api/internal/pubsub/hub.go
go
1// services/api/internal/pubsub/hub.go
2
3type Hub struct {
4 mu sync.RWMutex
5 next int
6 subs map[int]*subscriber
7}
8
9type subscriber struct {
10 conversationID string // "" = receive all
11 ch chan DraftEvent
12}
13
14func (h *Hub) Subscribe(conversationID string) (<-chan DraftEvent, func()) {
15 h.mu.Lock()
16 defer h.mu.Unlock()
17 id := h.next
18 h.next++
19 s := &subscriber{conversationID: conversationID, ch: make(chan DraftEvent, 16)}
20 h.subs[id] = s
21 return s.ch, func() {
22 h.mu.Lock()
23 defer h.mu.Unlock()
24 if existing, ok := h.subs[id]; ok {
25 close(existing.ch)
26 delete(h.subs, id)
27 }
28 }
29}
30
31// Publish is non-blocking: full buffer = event dropped, not blocked.
32func (h *Hub) Publish(e DraftEvent) {
33 h.mu.RLock()
34 defer h.mu.RUnlock()
35 for _, s := range h.subs {
36 if s.conversationID != "" && s.conversationID != e.ConversationID { continue }
37 select {
38 case s.ch <- e:
39 default: // client too slow — drop and let them re-query
40 }
41 }
42}

The default branch in Publish is critical. A slow client must not stall the Redis consumer loop that serves every other subscriber.


Step 8 — The Entrypoint

main.go wires everything in dependency order, then starts the HTTP server. Shutdown drains gracefully on SIGINT/SIGTERM.

services/api/cmd/server/main.go
go
1// services/api/cmd/server/main.go
2
3func run(log *slog.Logger) error {
4 cfg, err := config.Load()
5 if err != nil { return err }
6
7 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
8 defer stop()
9
10 shutdownTracing, err := obs.Init(ctx, log)
11 if err != nil { return err }
12 defer shutdownTracing(...)
13
14 store, err := repo.NewPostgres(ctx, cfg.DatabaseURL, cfg.PGPoolMaxConns)
15 if err != nil { return err }
16 defer store.Close()
17
18 hub := pubsub.NewHub()
19 bus, err := pubsub.NewBus(ctx, cfg.RedisURL, cfg.StreamMessages, cfg.StreamDrafts, hub)
20 if err != nil { return err }
21 defer bus.Close()
22
23 svc := service.New(store, bus, log)
24 root := resolvers.New(svc, hub, log)
25
26 // Bridge drafts stream → subscriptions in background.
27 go bus.ConsumeDrafts(ctx)
28
29 mux := http.NewServeMux()
30 mux.Handle("/", playground.Handler("Resolver", "/query"))
31 mux.Handle("/query", otelhttp.NewHandler(graphqlHandler(root, ...), "graphql"))
32 mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
33 w.WriteHeader(http.StatusOK)
34 _, _ = w.Write([]byte("ok"))
35 })
36
37 srv := &http.Server{Addr: cfg.Addr(), Handler: withCORS(..., mux), ReadHeaderTimeout: 10 * time.Second}
38
39 // Wait for either a server error or a shutdown signal.
40 select {
41 case err := <-errCh:
42 return err
43 case <-ctx.Done():
44 return srv.Shutdown(context.WithTimeout(context.Background(), 15*time.Second))
45 }
46}

otelhttp.NewHandler wraps the GraphQL endpoint so every request gets an OTel server span. The trace ID extracted here is the one the service layer injects into the message.created event — the same trace continues in the Python worker.


Step 9 — Observability

OTel is configured by OTEL_TRACES_EXPORTER. In development it defaults to console (stdout). In a full stack it ships OTLP to Jaeger.

services/api/internal/obs/tracing.go
go
1// services/api/internal/obs/tracing.go
2
3func Init(ctx context.Context, log *slog.Logger) (func(context.Context) error, error) {
4 mode := os.Getenv("OTEL_TRACES_EXPORTER")
5 if mode == "" { mode = "console" }
6 if mode == "none" {
7 return func(context.Context) error { return nil }, nil
8 }
9
10 var exp sdktrace.SpanExporter
11 switch mode {
12 case "otlp": exp, _ = otlptracehttp.New(ctx) // OTEL_EXPORTER_OTLP_ENDPOINT
13 case "console": exp, _ = stdouttrace.New(stdouttrace.WithoutTimestamps())
14 }
15
16 tp := sdktrace.NewTracerProvider(
17 sdktrace.WithBatcher(exp),
18 sdktrace.WithResource(res),
19 )
20 otel.SetTracerProvider(tp)
21 // W3C trace-context so trace propagates API → event bus → worker → LLM.
22 otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
23 propagation.TraceContext{}, propagation.Baggage{},
24 ))
25 return tp.Shutdown, nil
26}

Run Jaeger with:

bash
bash
1docker compose --profile observability up jaeger
2# Then set in .env:
3# OTEL_TRACES_EXPORTER=otlp
4# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318

The UI will show one trace spanning the HTTP mutation → service → event publish → worker consume → LLM generate → draft write.


The Tests

Service tests — fake Store + fake Publisher

services/api/internal/service/service_test.go
go
1// services/api/internal/service/service_test.go
2
3func TestIngestMessagePersistsAndPublishesOnce(t *testing.T) {
4 store := newFakeStore()
5 pub := &fakePub{}
6 svc := New(store, pub, nil)
7
8 m, err := svc.IngestMessage(context.Background(),
9 repo.CreateMessageInput{Body: " my order is late "})
10 if err != nil { t.Fatalf("IngestMessage: %v", err) }
11
12 // Exactly one publish: duplicate = worker processes twice; zero = stranded.
13 if pub.count != 1 { t.Fatalf("want 1 event, got %d", pub.count) }
14
15 // Body trimmed before persist.
16 if store.created[0].Body != "my order is late" {
17 t.Fatalf("body not trimmed: %q", store.created[0].Body)
18 }
19}
20
21func TestIngestMessageRejectsBadInput(t *testing.T) {
22 cases := map[string]string{
23 "empty": " ",
24 "too long": strings.Repeat("x", maxBodyLen+1),
25 }
26 for name, body := range cases {
27 t.Run(name, func(t *testing.T) {
28 svc := New(newFakeStore(), &fakePub{}, nil)
29 _, err := svc.IngestMessage(context.Background(),
30 repo.CreateMessageInput{Body: body})
31 if !errors.Is(err, ErrInvalidInput) {
32 t.Fatalf("expected ErrInvalidInput, got %v", err)
33 }
34 })
35 }
36}
37
38func TestApproveReplyTransitionsAndAudits(t *testing.T) {
39 store := newFakeStore()
40 store.drafts["d1"] = &repo.Draft{ID: "d1", Status: "SUGGESTED"}
41 svc := New(store, &fakePub{}, nil)
42
43 edit := "Here is your refund status."
44 d, err := svc.ApproveReply(context.Background(), "d1", &edit)
45 if err != nil { t.Fatalf("ApproveReply: %v", err) }
46
47 if d.Status != "SENT" || d.Answer != edit {
48 t.Fatalf("unexpected draft: %+v", d)
49 }
50 if store.audits != 1 { t.Fatalf("expected 1 audit row, got %d", store.audits) }
51}

No Postgres, no Redis, no network. The fake implements Store and Publisher in ~50 lines.

Hub tests — concurrent fan-out correctness

services/api/internal/pubsub/hub_test.go
go
1// services/api/internal/pubsub/hub_test.go
2
3func TestHubDeliversToMatchingSubscribers(t *testing.T) {
4 h := NewHub()
5 all, unsubAll := h.Subscribe("") // wildcard
6 convA, unsubA := h.Subscribe("conv-A") // scoped
7 defer unsubAll(); defer unsubA()
8
9 h.Publish(DraftEvent{DraftID: "d1", ConversationID: "conv-A"})
10
11 // Wildcard receives it.
12 if e, ok := recvWithin(t, all, time.Second); !ok || e.DraftID != "d1" {
13 t.Fatalf("wildcard missed event")
14 }
15 // conv-A scoped receives it.
16 if e, ok := recvWithin(t, convA, time.Second); !ok || e.DraftID != "d1" {
17 t.Fatalf("conv-A subscriber missed event")
18 }
19
20 // conv-B event must NOT reach conv-A subscriber.
21 h.Publish(DraftEvent{DraftID: "d2", ConversationID: "conv-B"})
22 if _, ok := recvWithin(t, convA, 200*time.Millisecond); ok {
23 t.Fatal("conv-A received conv-B event")
24 }
25}
26
27func TestHubPublishDoesNotBlockOnFullBuffer(t *testing.T) {
28 h := NewHub()
29 _, unsub := h.Subscribe("") // never drained
30 defer unsub()
31
32 done := make(chan struct{})
33 go func() {
34 for i := 0; i < 100; i++ { h.Publish(DraftEvent{DraftID: "x"}) }
35 close(done)
36 }()
37 select {
38 case <-done:
39 case <-time.After(2 * time.Second):
40 t.Fatal("Publish blocked on a full subscriber buffer")
41 }
42}

Priority queue tests — SQL ↔ Go parity

services/api/internal/repo/priority_test.go
go
1// services/api/internal/repo/priority_test.go
2
3func TestGoRank(t *testing.T) {
4 tests := []struct{ urgency, sentiment string; want int }{
5 {"HIGH", "NEGATIVE", 7},
6 {"HIGH", "NEUTRAL", 6},
7 {"NORMAL", "NEGATIVE", 5},
8 {"NORMAL", "POSITIVE", 4},
9 {"LOW", "NEGATIVE", 3},
10 {"LOW", "POSITIVE", 2},
11 }
12 for _, tt := range tests {
13 got := goRank(Draft{Urgency: tt.urgency, Sentiment: tt.sentiment})
14 if got != tt.want {
15 t.Errorf("goRank(%s,%s) = %d, want %d",
16 tt.urgency, tt.sentiment, got, tt.want)
17 }
18 }
19}
20
21func TestGoRankOrders(t *testing.T) {
22 // Urgency strictly dominates sentiment.
23 if goRank(Draft{Urgency: "HIGH", Sentiment: "NEUTRAL"}) <=
24 goRank(Draft{Urgency: "NORMAL", Sentiment: "NEGATIVE"}) {
25 t.Error("HIGH urgency must outrank NORMAL regardless of sentiment")
26 }
27}

Request Flow, End to End

Request Flow End to End

The UI mutation returns after the XADD — before the AI pipeline runs. The draft arrives later over the WebSocket subscription. The API is never blocked on LLM latency.


Run the Tests

bash
bash
1cd resolver_code/services/api
2
3# All tests
4go test ./...
5
6# Verbose, with race detector
7go test -race -v ./...
8
9# Just service tests
10go test -v ./internal/service/...
11
12# Just hub tests
13go test -v ./internal/pubsub/...
14
15# Just repo/priority tests
16go test -v ./internal/repo/...

Blog series · 6 parts

Let's Build a Customer Support Co-Pilot

an Event-Driven AI Agent with LangGraph, Go, pgvector & Redis Streams

View on GitHub
GoPythonpgvectorRedisNext.jsDockerLangGraph

Ready to Build Something Extraordinary?

Let's discuss your idea. We'll show you how AI-powered development can compress your timeline and budget — without cutting corners.

We respond within 24 hours. No sales pitch — just a straight conversation about your project.

More from the Blog

Explore more engineering insights, case studies, and technical deep-dives.

View all posts →
Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 6]
AITutorial

Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 6]

DieCutGo Studio turns any uploaded artwork into a print-ready die-cut sticker — background removal, contour tracing, print-readiness checks, mockups, and a shareable storefront, all backed by a Go pipeline fast enough to feel instant. Over this series I'll walk through how the whole thing is built, starting today with the least glamorous but most consequential decision: how the repo itself is laid out.

Karan KashyapJul 4, 2026
Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 5]
AITutorial

Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 5]

DieCutGo Studio turns any uploaded artwork into a print-ready die-cut sticker — background removal, contour tracing, print-readiness checks, mockups, and a shareable storefront, all backed by a Go pipeline fast enough to feel instant. Over this series I'll walk through how the whole thing is built, starting today with the least glamorous but most consequential decision: how the repo itself is laid out.

Karan KashyapJul 3, 2026
Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 4]
AITutorial

Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 4]

DieCutGo Studio turns any uploaded artwork into a print-ready die-cut sticker — background removal, contour tracing, print-readiness checks, mockups, and a shareable storefront, all backed by a Go pipeline fast enough to feel instant. Over this series I'll walk through how the whole thing is built, starting today with the least glamorous but most consequential decision: how the repo itself is laid out.

Karan KashyapJul 3, 2026
Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 3]
AITutorial

Let's Build a Print-Ready Die-Cut Sticker SaaS from scratch in Golang & Next.js [Part 3]

DieCutGo Studio turns any uploaded artwork into a print-ready die-cut sticker — background removal, contour tracing, print-readiness checks, mockups, and a shareable storefront, all backed by a Go pipeline fast enough to feel instant. Over this series I'll walk through how the whole thing is built, starting today with the least glamorous but most consequential decision: how the repo itself is laid out.

Karan KashyapJul 2, 2026