← Home

Stateless graphs, stateful consumers

langgraphkafkaagents

The event description for the Brunson three from the last post reads: Brunson 26' 3PT Pullup Jump Shot (34 PTS). That's the entire raw payload. There's no lead_margin, no scoring_run_in_progress, no seconds_remaining. The insight that called it a "one-man rescue mission" with Brunson accounting for four of the Knicks' last five scoring plays — none of that came from the event. It came from state that had accumulated across the previous hundred events.

State has to live somewhere. In a LangGraph application, the obvious place is the graph state. LangGraph is built around it — TypedDict schema, annotated reducers, built-in persistence. For a chat agent, that's exactly right. For a queue-driven agent that processes one event at a time and never looks back, it's the wrong tool.

Two kinds of state

There are two distinct state-management problems in this agent, and they have different shapes.

The first is per-event state: what does the graph need to process this event? The raw play-by-play dict, the game context at this moment, the messages accumulating through the classifier's tool loop, whatever action the agent has decided on so far. This state is born when an event arrives, mutated as the graph runs through its nodes, and discarded when the graph finishes. It doesn't need to persist.

The second is cross-event state: what's accumulated since the game started? Running score, current period, clock, the last five scoring plays for momentum detection, per-player foul counts. This state has to outlive every graph invocation. A player's foul count at actionNumber 142 is the sum of every foul the tracker has seen since actionNumber 1.

LangGraph's AgentState handles the first problem cleanly:

class AgentState(TypedDict):
    event: dict
    game_context: dict
    messages: Annotated[list, add_messages]
    action: Action
    insight: str | None
    severity: str | None

game_context is a plain dict — a snapshot of the running game state at the moment this event was processed. It doesn't grow as the graph runs. The graph reads it but doesn't write to it.

The second problem — accumulating that snapshot across hundreds of events — is handled by GameContextTracker, a dataclass that lives in the consumer loop, not in the graph.

What the tracker actually stores

@dataclass
class GameContextTracker:
    game_id: str | None = None
    period: int = 1
    clock: str = "12:00"
    score_home: int = 0
    score_away: int = 0
    home_team: str = ""
    away_team: str = ""
    last_scoring_plays: deque = field(default_factory=lambda: deque(maxlen=5))
    player_fouls: dict[int, int] = field(default_factory=dict)

The last_scoring_plays field is a deque with maxlen=5 — a rolling window of the last five baskets. When the sixth one lands, the first drops off automatically. This is how the classifier knows whether three consecutive scoring plays belong to the same team: it reads game_context["last_scoring_plays"] and checks. The event itself is silent on momentum.

player_fouls is a dict keyed by personId. It increments on any actionType containing "foul" — personal, shooting, offensive, flagrant. The heuristic for "foul trouble" in the classifier prompt is "5 fouls on a key player in a close game." That requires knowing each player's accumulated foul count since tip-off. The event description will say "Foul: Love (P4.PN4)" but only the tracker can tell you the P in PN4 matches a history of three previous fouls.

The snapshot pattern

The tracker exposes two methods. update() folds one event into the running state:

def update(self, event: dict) -> dict:
    if event.get("period"):
        self.period = int(event["period"])
    if event.get("clock"):
        self.clock = parse_clock(event["clock"])
    # ... score tracking, team tricode capture, foul accumulation
    return self.snapshot()

snapshot() returns a plain dict copy of the current state:

def snapshot(self) -> dict:
    return {
        "game_id": self.game_id,
        "period": self.period,
        "clock": self.clock,
        "score_home": self.score_home,
        "score_away": self.score_away,
        "score_margin": self.score_home - self.score_away,
        "home_team": self.home_team,
        "away_team": self.away_team,
        "last_scoring_plays": list(self.last_scoring_plays),
        "player_fouls": dict(self.player_fouls),
    }

The graph receives this dict. It never touches the tracker directly. It can't — there's no reference to the tracker in AgentState. The snapshot is a value, not a handle. Whatever the graph does with it, the tracker's running state is unaffected.

How _process_event connects them

The consumer loop's inner function, _process_event, is where the two state systems meet:

async def _process_event(
    event: dict,
    tracker: GameContextTracker,
    seen_pairs: set[tuple[str, int]],
) -> None:
    snapshot = tracker.update(event)
 
    pair_key = (event.get("gameId"), event.get("actionNumber"))
    if pair_key in seen_pairs:
        return  # second half of a paired play; tracker updated, graph skipped
    seen_pairs.add(pair_key)
 
    prefiltered = should_skip(event, snapshot)
    if prefiltered is not None:
        return  # deterministic skip; no LLM call
 
    initial_state: AgentState = {
        "event": event,
        "game_context": snapshot,
        "messages": [],
        "action": Action.SKIPPED_OTHER,
        "insight": None,
        "severity": None,
    }
    final_state = await _graph.ainvoke(initial_state)

The tracker always updates. The graph only runs when the event makes it through dedup and the pre-filter. The snapshot passed into game_context is the post-update value — the tracker has already folded this event in, so the graph sees the state after this event, not before it. A scoring play's context snapshot includes the updated score; the classifier reads a margin that reflects the basket it's analyzing.

The dedup wrinkle

PlayByPlayV3 emits two events per actionNumber for paired plays — a steal and the turnover it generated are the same moment, sent as two separate rows with the same actionNumber. Without dedup, the agent generates two insights for one play, usually contradictory (one treating the turnover as the primary event, one treating the steal).

The fix is seen_pairs, a set[tuple[str, int]] tracking (gameId, actionNumber) pairs. When the second half of a paired play arrives, the tracker still folds it — the foul count, score, and team tricodes need to reflect both halves — but _process_event returns before invoking the graph. One moment, one insight.

The seen_pairs set lives in the consumer loop alongside the tracker. Both are per-run, per-game. If you restart the agent mid-game, you lose dedup history, but that's acceptable for this use case. The alternative — persisting seen_pairs across restarts — adds complexity without a real payoff for a replay or demo scenario.

Why this makes the graph testable

The function signature of _process_event tells the whole story:

async def _process_event(
    event: dict,
    tracker: GameContextTracker,
    seen_pairs: set[tuple[str, int]],
) -> None:

No Kafka, no consumer group, no offset management. You can call it in a test with a hand-constructed event dict, a fresh tracker, and an empty set:

tracker = GameContextTracker()
await _process_event(make_event(actionNumber=1), tracker, set())

The stateless-graph pattern is what makes this possible. Each invocation is a fresh AgentState. The graph doesn't know what event came before it, and it doesn't need to. If you want to test a specific scenario — say, a momentum run already in progress when an event arrives — you construct a tracker with the right last_scoring_plays, call tracker.snapshot(), and pass the result straight into AgentState. No consumer loop, no Kafka, no replaying 140 prior events to get to the right game state.

What this design costs

One place it creates friction: the graph can't write back to the tracker. If the classifier discovers something — say, a momentum run the score fields alone don't capture — it can put that in messages or insight, but it can't update the tracker's last_scoring_plays. The tracker is append-only from the consumer's perspective.

This hasn't mattered in practice because all the state the classifier actually needs is already in the snapshot. The tracker is the source of truth; the graph is a reader.

The other cost is that you have to think carefully about what belongs in the tracker versus what belongs in AgentState. A good default: if it needs to persist across events, it's tracker state. If it's computed fresh for this event and then discarded, it's graph state. Everything in AgentState is in the second category.

What comes next

The two-LLM split in Post 3 builds directly on this. The game_context snapshot from the tracker is what both the classifier and the narrator receive. The reason the two LLMs ended up on different models with different temperatures — and why that turned out to be the most consequential architectural decision in the project — becomes visible once you understand what each LLM is actually reading.