Summary
Event sourcing stores agent state as an append-only sequence of events rather than current values. This pattern enables complete audit trails, time-travel debugging, natural checkpoint/resume support, and deterministic replay. For AI agents that need fault tolerance, human-in-the-loop approval, and debuggability, event sourcing provides the foundation for production-grade reliability.
Why Event Sourcing for Agents?
Traditional state management stores current values. When a variable changes from 5 to 7, you lose the fact that it was ever 5. Event sourcing flips this: you store the change (increment by 2), not the result.
For AI agents, this inversion matters for several reasons:
1. Audit and Compliance
Every tool call, every decision, every approval is permanently recorded. When something goes wrong, you can trace exactly what happened.
2. Debugging Complex Failures
AI agents fail in subtle ways. A wrong tool call at step 47 might not manifest until step 83. Event logs let you replay execution to find the root cause.
3. Natural Pause/Resume
When an agent pauses for human approval, resumption is trivial: load the event log, derive current state, continue.
4. Retry Without Side Effects
If an agent crashes mid-execution, you know exactly which events completed. Retry starts from the last successful event.
5. Multi-Agent Coordination
Multiple agents can read the same event stream and build their own projections. This enables coordination without shared mutable state.
Core Concepts
Events vs. State
Traditional (State-based):
┌─────────────────────────────┐
│ task_status: "in_progress" │ ← Only know current state
│ step_count: 5 │
│ errors: ["timeout at 3"] │
│ last_tool: "edit_file" │
└─────────────────────────────┘
Event-Sourced:
┌─────────────────────────────────────────────────────────────┐
│ 1. { type: "task_started", task: "fix login", ts: T1 } │
│ 2. { type: "tool_called", tool: "read_file", ts: T2 } │
│ 3. { type: "tool_result", success: true, ts: T3 } │
│ 4. { type: "tool_called", tool: "edit_file", ts: T4 } │
│ 5. { type: "error", message: "timeout", ts: T5 } │ ← Full history
│ 6. { type: "retry", attempt: 2, ts: T6 } │
│ 7. { type: "tool_result", success: true, ts: T7 } │
│ 8. { type: "tool_called", tool: "run_tests", ts: T8 } │
│ 9. { type: "tool_result", success: true, ts: T9 } │
│ 10. { type: "task_completed", result: "success", ts: T10 } │
└─────────────────────────────────────────────────────────────┘
State is always derived from events, never stored directly. This guarantees consistency: the state you see is always explainable by the events that produced it.
The Event Log as Source of Truth
interface EventLog {
threadId: string;
events: AgentEvent[];
// Append-only: never modify existing events
append(event: AgentEvent): Promise<void>;
// Read operations
getAll(): AgentEvent[];
getSince(sequenceNumber: number): AgentEvent[];
getByType(type: string): AgentEvent[];
}
The event log has one invariant: events are never modified or deleted. Once appended, an event is permanent. To “undo” something, you append a compensating event.
Event Schema Design
Event Structure
Every event should include:
interface AgentEvent {
// Identity
id: string; // Unique event ID
sequenceNumber: number; // Monotonic ordering
threadId: string; // Which agent thread
// Timing
timestamp: Date; // When it occurred
// Classification
type: string; // Event type (see taxonomy below)
category: "lifecycle" | "action" | "result" | "human" | "system";
// Payload
payload: Record<string, unknown>;
// Context
parentEventId?: string; // For causal chains
correlationId?: string; // For distributed tracing
// Versioning
schemaVersion: number;
}
Event Taxonomy
Organize events into clear categories:
// Lifecycle Events
type LifecycleEvent =
| { type: "thread_created"; task: string; config: AgentConfig }
| { type: "thread_paused"; reason: string }
| { type: "thread_resumed"; feedback?: string }
| { type: "thread_completed"; result: unknown }
| { type: "thread_failed"; error: string }
| { type: "thread_cancelled"; reason: string };
// Action Events (agent decisions)
type ActionEvent =
| { type: "tool_called"; tool: string; params: unknown }
| { type: "plan_created"; steps: string[] }
| { type: "decision_made"; reasoning: string; choice: string }
| { type: "sub_agent_spawned"; subAgentId: string; task: string };
// Result Events (outcomes)
type ResultEvent =
| { type: "tool_succeeded"; result: unknown }
| { type: "tool_failed"; error: string; retryable: boolean }
| { type: "sub_agent_completed"; subAgentId: string; result: unknown }
| { type: "file_modified"; path: string; before: string; after: string };
// Human Interaction Events
type HumanEvent =
| { type: "approval_requested"; action: string; context: string }
| { type: "approval_granted"; approver: string }
| { type: "approval_denied"; approver: string; reason: string }
| { type: "human_feedback"; content: string; source: string }
| { type: "human_escalation"; urgency: string; context: string };
// System Events
type SystemEvent =
| { type: "context_compacted"; removedTokens: number }
| { type: "checkpoint_created"; snapshotId: string }
| { type: "rate_limited"; service: string; waitMs: number }
| { type: "budget_warning"; used: number; limit: number };
type AgentEvent = LifecycleEvent | ActionEvent | ResultEvent | HumanEvent | SystemEvent;
Event Naming Conventions
-
Past tense: Events describe what happened, not what will happen
- Good:
tool_called,approval_granted - Bad:
call_tool,grant_approval
- Good:
-
Specific over generic: Avoid catch-all events
- Good:
file_created,file_modified,file_deleted - Bad:
file_changedwith achangeTypefield
- Good:
-
Self-documenting payloads: Event should be understandable in isolation
- Good:
{ type: "tool_called", tool: "edit_file", path: "src/auth.ts" } - Bad:
{ type: "tool_called", ref: "t47" }
- Good:
Projections: Views from Events
A projection is a view built by processing events. Different consumers build different projections from the same event stream.
Common Projections
// Projection 1: Current execution state (for agent)
interface ExecutionState {
currentStep: number;
status: "running" | "paused" | "completed" | "failed";
pendingApprovals: ApprovalRequest[];
toolCallCount: number;
lastActivity: Date;
}
function projectExecutionState(events: AgentEvent[]): ExecutionState {
const state: ExecutionState = {
currentStep: 0,
status: "running",
pendingApprovals: [],
toolCallCount: 0,
lastActivity: events[0]?.timestamp ?? new Date(),
};
for (const event of events) {
switch (event.type) {
case "tool_called":
state.currentStep++;
state.toolCallCount++;
break;
case "thread_paused":
state.status = "paused";
break;
case "thread_resumed":
state.status = "running";
break;
case "thread_completed":
state.status = "completed";
break;
case "thread_failed":
state.status = "failed";
break;
case "approval_requested":
state.pendingApprovals.push({
action: event.payload.action,
requestedAt: event.timestamp,
});
break;
case "approval_granted":
case "approval_denied":
state.pendingApprovals = state.pendingApprovals.filter(
a => a.action !== event.payload.action
);
break;
}
state.lastActivity = event.timestamp;
}
return state;
}
// Projection 2: Cost tracking (for billing)
interface CostProjection {
totalTokens: number;
inputTokens: number;
outputTokens: number;
toolCalls: number;
estimatedCost: number;
}
function projectCost(events: AgentEvent[]): CostProjection {
const projection: CostProjection = {
totalTokens: 0,
inputTokens: 0,
outputTokens: 0,
toolCalls: 0,
estimatedCost: 0,
};
for (const event of events) {
if (event.type === "tool_called") {
projection.toolCalls++;
}
if (event.payload?.tokens) {
projection.inputTokens += event.payload.tokens.input ?? 0;
projection.outputTokens += event.payload.tokens.output ?? 0;
}
}
projection.totalTokens = projection.inputTokens + projection.outputTokens;
projection.estimatedCost = calculateCost(projection);
return projection;
}
// Projection 3: Audit log (for compliance)
interface AuditEntry {
timestamp: Date;
action: string;
actor: "agent" | "human";
details: string;
}
function projectAuditLog(events: AgentEvent[]): AuditEntry[] {
return events
.filter(e => ["tool_called", "approval_granted", "approval_denied", "file_modified"].includes(e.type))
.map(e => ({
timestamp: e.timestamp,
action: e.type,
actor: e.category === "human" ? "human" : "agent",
details: JSON.stringify(e.payload),
}));
}
Projection Isolation
Each projection is independent. This means:
- Different systems can have different views of the same events
- Projections can be rebuilt from scratch at any time
- Adding new projections doesn’t require schema changes
- Projections can be optimized independently (caching, indexing)
Snapshots: Performance Optimization
For long-running agents, replaying thousands of events to derive state becomes slow. Snapshots solve this by periodically capturing derived state.
interface Snapshot {
id: string;
threadId: string;
sequenceNumber: number; // Event sequence at snapshot time
timestamp: Date;
state: ExecutionState; // Projection at this point
}
class SnapshotStore {
async createSnapshot(threadId: string, events: AgentEvent[]): Promise<Snapshot> {
const state = projectExecutionState(events);
const lastEvent = events[events.length - 1];
const snapshot: Snapshot = {
id: crypto.randomUUID(),
threadId,
sequenceNumber: lastEvent.sequenceNumber,
timestamp: new Date(),
state,
};
await this.db.save(snapshot);
return snapshot;
}
async loadStateEfficiently(threadId: string): Promise<ExecutionState> {
// 1. Find most recent snapshot
const snapshot = await this.db.getLatestSnapshot(threadId);
// 2. Get events since snapshot
const recentEvents = snapshot
? await this.eventLog.getSince(snapshot.sequenceNumber)
: await this.eventLog.getAll();
// 3. Apply recent events to snapshot state
const baseState = snapshot?.state ?? initialState();
return applyEvents(baseState, recentEvents);
}
}
Snapshot Strategy
When to create snapshots:
const SNAPSHOT_STRATEGIES = {
// Time-based: Every N minutes
timeBased: (lastSnapshot: Snapshot) => {
const age = Date.now() - lastSnapshot.timestamp.getTime();
return age > 5 * 60 * 1000; // 5 minutes
},
// Count-based: Every N events
countBased: (events: AgentEvent[], lastSnapshot: Snapshot) => {
const eventsSinceSnapshot = events.length - lastSnapshot.sequenceNumber;
return eventsSinceSnapshot > 100;
},
// Milestone-based: At significant points
milestoneBased: (event: AgentEvent) => {
return ["thread_paused", "approval_granted", "sub_agent_completed"].includes(event.type);
},
};
Time-Travel Debugging
Event sourcing enables powerful debugging: replay events to any point in time.
Replay to Specific Point
async function replayToEvent(
threadId: string,
targetEventId: string
): Promise<ExecutionState> {
const allEvents = await eventLog.getAll(threadId);
const targetIndex = allEvents.findIndex(e => e.id === targetEventId);
if (targetIndex === -1) {
throw new Error(`Event ${targetEventId} not found`);
}
// Replay only up to target event
const eventsToReplay = allEvents.slice(0, targetIndex + 1);
return projectExecutionState(eventsToReplay);
}
Debugging Workflow
async function debugFailure(threadId: string): Promise<DebugReport> {
const events = await eventLog.getAll(threadId);
// Find failure event
const failureEvent = events.find(e => e.type === "thread_failed");
if (!failureEvent) {
return { status: "no_failure_found" };
}
// Get events leading to failure
const failureIndex = events.indexOf(failureEvent);
const precedingEvents = events.slice(Math.max(0, failureIndex - 10), failureIndex);
// Identify potential causes
const potentialCauses = precedingEvents
.filter(e => e.type === "tool_failed" || e.type === "error")
.map(e => ({
event: e,
stateAtTime: projectExecutionState(events.slice(0, events.indexOf(e) + 1)),
}));
return {
failureEvent,
precedingEvents,
potentialCauses,
fullTimeline: events.map(e => ({
id: e.id,
type: e.type,
timestamp: e.timestamp,
})),
};
}
Interactive Replay
For complex debugging, replay with breakpoints:
class InteractiveReplay {
private events: AgentEvent[];
private currentIndex: number = 0;
private breakpoints: Set<string> = new Set();
constructor(events: AgentEvent[]) {
this.events = events;
}
setBreakpoint(eventType: string): void {
this.breakpoints.add(eventType);
}
step(): { event: AgentEvent; state: ExecutionState } {
if (this.currentIndex >= this.events.length) {
throw new Error("End of events");
}
const event = this.events[this.currentIndex++];
const state = projectExecutionState(this.events.slice(0, this.currentIndex));
return { event, state };
}
continue(): { event: AgentEvent; state: ExecutionState } | null {
while (this.currentIndex < this.events.length) {
const { event, state } = this.step();
if (this.breakpoints.has(event.type)) {
return { event, state };
}
}
return null; // Reached end
}
rewind(steps: number = 1): void {
this.currentIndex = Math.max(0, this.currentIndex - steps);
}
getCurrentState(): ExecutionState {
return projectExecutionState(this.events.slice(0, this.currentIndex));
}
}
Distributed Agents and Event Ordering
When multiple agents work together, event ordering becomes complex.
Causal Ordering
Use parent event IDs to track causality:
interface CausalEvent extends AgentEvent {
parentEventId?: string; // Event that caused this one
correlationId: string; // Shared ID for related events across agents
}
// When spawning a sub-agent
const spawnEvent: CausalEvent = {
id: "evt_123",
type: "sub_agent_spawned",
correlationId: "task_abc",
payload: { subAgentId: "agent_456", task: "review code" },
// ...
};
// Sub-agent's first event references parent
const subAgentStartEvent: CausalEvent = {
id: "evt_124",
type: "thread_created",
parentEventId: "evt_123", // Caused by spawn event
correlationId: "task_abc", // Same correlation ID
// ...
};
Event Aggregation
Aggregate events from multiple agents:
interface MultiAgentEventStore {
// Append to specific agent's stream
append(agentId: string, event: AgentEvent): Promise<void>;
// Get events across all agents, ordered by timestamp
getAllByCorrelation(correlationId: string): Promise<AgentEvent[]>;
// Build global timeline
getGlobalTimeline(since: Date): Promise<AgentEvent[]>;
}
async function buildMultiAgentTimeline(
correlationId: string,
eventStore: MultiAgentEventStore
): Promise<TimelineEntry[]> {
const events = await eventStore.getAllByCorrelation(correlationId);
// Sort by timestamp (or use vector clocks for true ordering)
events.sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime());
return events.map(e => ({
agentId: e.threadId,
event: e,
causalParent: events.find(p => p.id === e.parentEventId),
}));
}
Conflict Resolution
When events can arrive out of order:
class EventuallyConsistentStore {
private pendingEvents: Map<string, AgentEvent[]> = new Map();
async append(event: AgentEvent): Promise<void> {
// If event has a parent, check if parent exists
if (event.parentEventId) {
const parentExists = await this.eventExists(event.parentEventId);
if (!parentExists) {
// Buffer until parent arrives
const pending = this.pendingEvents.get(event.parentEventId) ?? [];
pending.push(event);
this.pendingEvents.set(event.parentEventId, pending);
return;
}
}
// Append event
await this.store(event);
// Process any events waiting for this one
const waitingEvents = this.pendingEvents.get(event.id) ?? [];
for (const waiting of waitingEvents) {
await this.append(waiting);
}
this.pendingEvents.delete(event.id);
}
}
Storage Options
JSON File (Simple)
Good for single-agent development:
class JsonFileEventStore implements EventLog {
constructor(private filePath: string) {}
async append(event: AgentEvent): Promise<void> {
const events = await this.load();
event.sequenceNumber = events.length;
events.push(event);
await writeFile(this.filePath, JSON.stringify(events, null, 2));
}
async getAll(): Promise<AgentEvent[]> {
return this.load();
}
private async load(): Promise<AgentEvent[]> {
try {
const content = await readFile(this.filePath, "utf-8");
return JSON.parse(content);
} catch {
return [];
}
}
}
SQLite (Production Single-Node)
Good for production single-agent with querying:
class SqliteEventStore implements EventLog {
constructor(private db: Database) {
this.db.exec(`
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
sequence_number INTEGER UNIQUE,
thread_id TEXT,
type TEXT,
category TEXT,
payload TEXT,
timestamp TEXT,
parent_event_id TEXT,
correlation_id TEXT,
schema_version INTEGER
);
CREATE INDEX IF NOT EXISTS idx_thread ON events(thread_id);
CREATE INDEX IF NOT EXISTS idx_type ON events(type);
CREATE INDEX IF NOT EXISTS idx_correlation ON events(correlation_id);
`);
}
async append(event: AgentEvent): Promise<void> {
const sequence = this.db.prepare(
"SELECT COALESCE(MAX(sequence_number), -1) + 1 FROM events"
).pluck().get();
this.db.prepare(`
INSERT INTO events (
id, sequence_number, thread_id, type, category,
payload, timestamp, parent_event_id, correlation_id, schema_version
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`).run(
event.id,
sequence,
event.threadId,
event.type,
event.category,
JSON.stringify(event.payload),
event.timestamp.toISOString(),
event.parentEventId,
event.correlationId,
event.schemaVersion
);
}
async getByType(type: string): Promise<AgentEvent[]> {
const rows = this.db.prepare(
"SELECT * FROM events WHERE type = ? ORDER BY sequence_number"
).all(type);
return rows.map(this.rowToEvent);
}
}
Cloud Database (Distributed)
For multi-agent or cloud deployments:
// Example with PostgreSQL
class PostgresEventStore implements EventLog {
async append(event: AgentEvent): Promise<void> {
await this.pool.query(`
INSERT INTO events (
id, thread_id, type, payload, timestamp, correlation_id
) VALUES ($1, $2, $3, $4, $5, $6)
`, [
event.id,
event.threadId,
event.type,
event.payload,
event.timestamp,
event.correlationId,
]);
// Notify listeners (for real-time projections)
await this.pool.query(`NOTIFY agent_events, '${event.id}'`);
}
}
// Example with DynamoDB
class DynamoEventStore implements EventLog {
async append(event: AgentEvent): Promise<void> {
await this.client.send(new PutItemCommand({
TableName: "agent_events",
Item: {
pk: { S: `thread#${event.threadId}` },
sk: { S: `event#${Date.now()}#${event.id}` },
type: { S: event.type },
payload: { S: JSON.stringify(event.payload) },
gsi1pk: { S: `correlation#${event.correlationId}` },
},
}));
}
}
Schema Evolution
Events are permanent, but schemas evolve. Handle this gracefully:
Versioning Strategy
interface VersionedEvent extends AgentEvent {
schemaVersion: number;
}
const CURRENT_SCHEMA_VERSION = 3;
function migrateEvent(event: VersionedEvent): VersionedEvent {
let current = event;
// v1 -> v2: Added correlation_id
if (current.schemaVersion === 1) {
current = {
...current,
correlationId: current.threadId, // Default to thread ID
schemaVersion: 2,
};
}
// v2 -> v3: Renamed 'error' to 'tool_failed'
if (current.schemaVersion === 2) {
if (current.type === "error") {
current = {
...current,
type: "tool_failed",
schemaVersion: 3,
};
} else {
current = { ...current, schemaVersion: 3 };
}
}
return current;
}
// Apply migration when loading events
async function loadEvents(threadId: string): Promise<AgentEvent[]> {
const rawEvents = await eventStore.getAll(threadId);
return rawEvents.map(migrateEvent);
}
Upcasting vs. Copy-and-Transform
Two strategies for handling old events:
// Upcasting: Migrate on read (lazy)
// Pros: No data modification, fast deployment
// Cons: Migration code runs every read
class UpcastingStore {
async getAll(): Promise<AgentEvent[]> {
const raw = await this.storage.getAll();
return raw.map(migrateEvent);
}
}
// Copy-and-Transform: Migrate once (eager)
// Pros: Clean data, fast reads
// Cons: Requires downtime or careful orchestration
async function migrateLegacyEvents(store: EventStore): Promise<void> {
const oldEvents = await store.getAll();
const migratedEvents = oldEvents.map(migrateEvent);
// Write to new store
await newStore.bulkInsert(migratedEvents);
// Switch over
await switchToNewStore();
}
Best Practices
1. Events Should Be Self-Contained
// Good: Event has all context needed
{
type: "file_modified",
payload: {
path: "src/auth.ts",
before: "function login() { ... }",
after: "function login(user: User) { ... }",
reason: "Added type annotation per review feedback",
}
}
// Bad: Event requires external lookup
{
type: "file_modified",
payload: {
fileId: "f_123", // What file? Requires lookup
diffId: "d_456", // What changed? Requires lookup
}
}
2. Make Events Immutable
// Good: Compensating event for "undo"
events: [
{ type: "approval_granted", approver: "alice" },
{ type: "approval_revoked", reason: "granted in error" }, // New event
]
// Bad: Modifying existing event
events: [
{ type: "approval_granted", approver: "alice", revoked: true }, // Modified
]
3. Separate Commands from Events
// Command: Request to do something (may fail)
interface StartTaskCommand {
type: "start_task";
task: string;
}
// Event: Record of what happened (always past tense)
interface TaskStartedEvent {
type: "task_started";
task: string;
timestamp: Date;
}
// Handler converts command to event
async function handleCommand(command: Command): Promise<Event> {
switch (command.type) {
case "start_task":
// Validation, side effects, etc.
return {
type: "task_started",
task: command.task,
timestamp: new Date(),
};
}
}
4. Include Enough Metadata
// Good: Rich metadata for debugging and analysis
{
id: "evt_abc123",
type: "tool_called",
timestamp: new Date(),
threadId: "thread_xyz",
sequenceNumber: 47,
correlationId: "task_123",
parentEventId: "evt_abc122",
schemaVersion: 3,
payload: {
tool: "edit_file",
params: { path: "src/auth.ts" },
duration_ms: 150,
model: "claude-sonnet-4-5-20250929",
prompt_tokens: 1500,
}
}
Common Pitfalls
Pitfall 1: Storing Derived State Alongside Events
// Bad: State can drift from events
{
events: [...],
state: { step: 5 }, // What if events say step is 6?
}
// Good: Always derive state
const state = projectExecutionState(events);
Pitfall 2: Events That Are Too Granular
// Bad: Noise events that don't add value
{ type: "character_typed", char: "h" },
{ type: "character_typed", char: "e" },
{ type: "character_typed", char: "l" },
// ...
// Good: Meaningful events
{ type: "file_content_changed", path: "src/auth.ts", content: "..." }
Pitfall 3: Events That Are Too Coarse
// Bad: Lost granularity
{ type: "work_done", result: "completed 5 files" }
// Good: Each action is an event
{ type: "file_modified", path: "src/auth.ts" },
{ type: "file_modified", path: "src/user.ts" },
// ...
Pitfall 4: Not Handling Event Ordering
// Bad: Assuming timestamp ordering is sufficient
events.sort((a, b) => a.timestamp - b.timestamp);
// Good: Use sequence numbers within a thread
events.sort((a, b) => a.sequenceNumber - b.sequenceNumber);
// For cross-thread: use causal ordering or vector clocks
Related
- Agent Memory Patterns – Overview of memory tiers including event sourcing
- 12 Factor Agents – Factors 5 and 6 define event-based state
- Human in the Loop Patterns – Approval workflows using event logs
- Clean Slate Trajectory Recovery – When to start fresh vs. resume
- The RALPH Loop – File-based memory as lightweight event sourcing
References
- Martin Fowler: Event Sourcing – Foundational article
- HumanLayer: 12 Factor Agents – Agent-specific event patterns
- Greg Young: CQRS and Event Sourcing – Deep dive on CQRS
- Designing Data-Intensive Applications – Chapter 11 covers event sourcing

