Skip to main content

Building Multi-Agent Memory Systems: Advanced Architectures

Production systems often require multiple agents working together: a sales agent negotiating with a customer while a backend agent checks inventory and pricing; a research team of agents exploring a complex problem from different angles; customer-support escalations where a human takes over from an agent and must access the full context. Shared memory systems enable coordination: agents can access each other's findings, avoid duplicate work, and escalate context reliably.

Motivations for Multi-Agent Memory

Single-agent memory systems manage context for one agent. Multi-agent memory systems must solve additional problems: (1) information sharing — Agent A's findings must be visible to Agent B without duplication, (2) coordination — preventing agents from taking conflicting actions, (3) escalation — when a task requires human intervention, the human must access full context, (4) learning — insights from one agent's failure or success should inform others, and (5) privacy — some agents may have access restrictions (e.g., financial agents see sensitive data; support agents don't).

For example, in a customer acquisition workflow: Agent1 (cold outreach) logs a prospect's contact info; Agent2 (research) enriches it with company data; Agent3 (sales) prepares a pitch. All three need fast, reliable access to shared findings.

Shared Memory Architectures

Three main patterns emerge:

1. Centralized Shared Memory

All agents write to and read from a central repository (database, message queue). Simplest to implement; single point of failure.

# Example: Centralized shared memory
from typing import Dict, List, Any
from datetime import datetime

class SharedMemory:
"""Centralized episodic + semantic memory for a multi-agent team."""

def __init__(self, db_connection):
self.db = db_connection
self.ensure_tables()

def log_observation(self, agent_id: str, observation: Dict[str, Any], tags: List[str] = None):
"""
Log an observation from an agent.
Tags enable filtering: ["prospect_data", "company_info", "competitor_intel"]
"""
self.db.execute("""
INSERT INTO shared_observations (agent_id, observation, tags, timestamp)
VALUES (?, ?, ?, NOW())
""", (agent_id, json.dumps(observation), json.dumps(tags or [])))

def get_observations_for_task(self, task_id: str, agent_id: str = None) -> list:
"""
Retrieve all observations for a task, optionally filtered by agent.
"""
query = "SELECT * FROM shared_observations WHERE task_id = ?"
params = [task_id]

if agent_id:
query += " AND agent_id != ?" # Exclude own observations
params.append(agent_id)

query += " ORDER BY timestamp DESC LIMIT 50"

rows = self.db.execute(query, params).fetchall()
return [self.row_to_observation(row) for row in rows]

def search_observations(self, query: str, agent_id: str = None) -> list:
"""
Full-text search across all observations.
Useful for discovering relevant findings from other agents.
"""
sql = "SELECT * FROM shared_observations WHERE observation LIKE ?"
params = [f"%{query}%"]

if agent_id:
sql += " AND agent_id != ?"
params.append(agent_id)

rows = self.db.execute(sql, params).fetchall()
return [self.row_to_observation(row) for row in rows]

def record_agent_decision(self, agent_id: str, decision: Dict[str, Any], reasoning: str):
"""
Log a significant decision for other agents to learn from.
"""
self.db.execute("""
INSERT INTO agent_decisions (agent_id, decision, reasoning, timestamp)
VALUES (?, ?, ?, NOW())
""", (agent_id, json.dumps(decision), reasoning))

2. Federated Memory with Sync

Each agent maintains its own memory; periodic sync broadcasts important facts to other agents. Reduces contention; eventual consistency.

# Example: Federated memory with periodic sync
class FederatedAgentMemory:
def __init__(self, agent_id: str, local_db, sync_broker):
self.agent_id = agent_id
self.local_db = local_db # Local episodic store
self.sync_broker = sync_broker # Message broker (Kafka, RabbitMQ, Redis)
self.remote_cache = {} # Cache of other agents' shared facts

def observe(self, observation: Dict[str, Any]):
"""
Log a local observation.
If important, broadcast to other agents.
"""
self.local_db.insert("observations", observation)

# Decide if worth sharing
if self._is_shareable(observation):
self.sync_broker.publish(f"agent.{self.agent_id}.observation", observation)

def _is_shareable(self, observation: Dict[str, Any]) -> bool:
"""Heuristic: should this observation be shared with other agents?"""
importance = observation.get("importance", 0)
return importance >= 0.7 # Share high-importance observations

def receive_broadcast(self, observation: Dict[str, Any], from_agent_id: str):
"""
Receive a broadcast observation from another agent.
Cache it locally for quick access.
"""
cache_key = f"{from_agent_id}_{observation.get('id')}"
self.remote_cache[cache_key] = observation

def get_shared_findings(self, query: str) -> list:
"""
Search across remote cache for findings from other agents.
"""
results = []
for cache_key, obs in self.remote_cache.items():
if query.lower() in str(obs).lower():
results.append(obs)
return results


# Example: Sync broker using Redis
class RedisSyncBroker:
def __init__(self, redis_client):
self.redis = redis_client

def publish(self, topic: str, message: Dict):
"""Publish an observation to all subscribers."""
import json
self.redis.publish(topic, json.dumps(message))

def subscribe(self, agent_id: str, callback):
"""
Agent subscribes to updates from other agents.
callback is called when a broadcast is received.
"""
def listen():
pubsub = self.redis.pubsub()
pubsub.psubscribe("agent.*")

for message in pubsub.listen():
if message["type"] == "pmessage":
import json
data = json.loads(message["data"])
from_agent = message["pattern"].decode().split(".")[1]

if from_agent != agent_id: # Skip own messages
callback(data, from_agent)

# Run listener in background thread
import threading
listener_thread = threading.Thread(target=listen, daemon=True)
listener_thread.start()

3. Hybrid: Centralized with Local Caching

Agents maintain a local cache of frequently-accessed facts; periodically refresh from central store. Balances consistency and latency.

# Example: Hybrid architecture with local cache
class HybridAgentMemory:
def __init__(self, agent_id: str, central_memory: SharedMemory, cache_ttl_seconds: int = 300):
self.agent_id = agent_id
self.central = central_memory # Central shared memory
self.local_cache = {} # Local cache: { fact_id: (value, timestamp) }
self.cache_ttl = cache_ttl_seconds # 5 minutes

def get_fact(self, fact_id: str):
"""
Retrieve a fact: check local cache first, then central store.
"""
from datetime import datetime, timedelta

# Check local cache
if fact_id in self.local_cache:
value, cached_at = self.local_cache[fact_id]
age_seconds = (datetime.now() - cached_at).total_seconds()

if age_seconds < self.cache_ttl:
return value # Cache hit

# Cache miss or expired: fetch from central
value = self.central.get_fact(fact_id)
self.local_cache[fact_id] = (value, datetime.now())

return value

def invalidate_cache(self, fact_id: str):
"""Invalidate a cache entry (called when central fact is updated)."""
if fact_id in self.local_cache:
del self.local_cache[fact_id]

def clear_cache(self):
"""Clear entire local cache."""
self.local_cache = {}

Memory Synchronization and Consistency

When multiple agents update shared memory, ensure consistency. Use versioning and conflict resolution:

# Example: Memory versioning for consistency
class VersionedSharedMemory:
def __init__(self, db_connection):
self.db = db_connection

def read_with_version(self, fact_id: str):
"""Read a fact and its version."""
row = self.db.execute(
"SELECT value, version FROM shared_facts WHERE id = ?",
(fact_id,)
).fetchone()
return row if row else (None, 0)

def write_if_unchanged(self, fact_id: str, new_value: dict, expected_version: int, agent_id: str):
"""
Update a fact only if version hasn't changed (optimistic locking).
Prevents concurrent agents from overwriting each other.
"""
rows_updated = self.db.execute("""
UPDATE shared_facts
SET value = ?, version = version + 1, updated_by = ?, updated_at = NOW()
WHERE id = ? AND version = ?
""", (json.dumps(new_value), agent_id, fact_id, expected_version)).rowcount

if rows_updated == 0:
raise VersionConflictError(
f"Fact {fact_id} was updated by another agent. Reload and retry."
)

Cross-Agent Learning and Knowledge Propagation

When one agent discovers something valuable (e.g., "This prospect is not a good fit"), that knowledge should inform other agents:

# Example: Cross-agent learning
class TeamLearningSystem:
def __init__(self, shared_memory: SharedMemory):
self.shared = shared_memory

def log_learning_signal(self, agent_id: str, signal_type: str, details: dict):
"""
Log a learning signal (success, failure, pattern discovered).
signal_type: "success", "failure", "pattern", "recommendation"
"""
self.shared.db.execute("""
INSERT INTO team_learnings (agent_id, signal_type, details, timestamp)
VALUES (?, ?, ?, NOW())
""", (agent_id, signal_type, json.dumps(details)))

def get_relevant_learnings(self, agent_id: str, context: dict) -> list:
"""
Retrieve learnings from other agents relevant to the current context.
For example, if a sales agent is pitching to an industry, find learnings
from other agents who pitched to similar companies.
"""
industry = context.get("industry")
company_size = context.get("company_size")

# Find learnings from similar pitches
similar_learnings = self.shared.db.execute("""
SELECT * FROM team_learnings
WHERE signal_type IN ('success', 'failure')
AND JSON_EXTRACT(details, '$.industry') = ?
AND JSON_EXTRACT(details, '$.company_size') = ?
ORDER BY timestamp DESC
LIMIT 10
""", (industry, company_size)).fetchall()

return similar_learnings

def propagate_insight(self, agent_id: str, insight: str, affected_agents: List[str]):
"""
Broadcast a critical insight to specific agents.
E.g., "Client budget cut; all deals on hold for 30 days."
"""
for target_agent_id in affected_agents:
self.shared.db.execute("""
INSERT INTO agent_alerts (from_agent, to_agent, insight, timestamp)
VALUES (?, ?, ?, NOW())
""", (agent_id, target_agent_id, insight))

Escalation Handoff: Human Takeover

When a task escalates from agent to human, the human must access full context. Design a clean handoff:

# Example: Escalation handoff
class EscalationManager:
def __init__(self, shared_memory: SharedMemory):
self.shared = shared_memory

def escalate_to_human(self, agent_id: str, task_id: str, reason: str, assigned_to: str = None):
"""
Escalate a task from agent to human.
Prepare a handoff package with full context.
"""
# Gather all relevant context
task_observations = self.shared.get_observations_for_task(task_id)
agent_decisions = self.shared.db.execute(
"SELECT * FROM agent_decisions WHERE task_id = ? ORDER BY timestamp DESC",
(task_id,)
).fetchall()

# Create a handoff summary
handoff_package = {
"escalated_by_agent": agent_id,
"reason": reason,
"assigned_to": assigned_to,
"escalation_time": datetime.now().isoformat(),
"task_id": task_id,
"full_context": {
"observations": task_observations,
"decisions": agent_decisions,
"status": "ready for human review"
}
}

# Store for human review
self.shared.db.execute("""
INSERT INTO escalations (task_id, package, status, created_at)
VALUES (?, ?, 'pending_human_review', NOW())
""", (task_id, json.dumps(handoff_package)))

# Notify human
notify_human_agent(assigned_to or "support_queue", task_id, reason)

return handoff_package

Key Takeaways

  • Multi-agent memory systems require shared observation logging, decision recording, and cross-agent learning.
  • Choose architecture: centralized (simple, single point of failure), federated (resilient, eventual consistency), or hybrid (fast, consistent).
  • Use versioning and optimistic locking to prevent concurrent agents from overwriting each other's updates.
  • Propagate learnings across agents: success patterns, failure modes, and insights help the team improve collectively.
  • Design clean escalation handoffs: when a human takes over from an agent, package full context for seamless continuity.

Frequently Asked Questions

How do I prevent two agents from taking conflicting actions?

Use a shared lock or flag: when Agent1 starts working on a task, set task.in_progress = true and task.assigned_to = agent1. Other agents check before starting. Use optimistic locking to prevent overwriting.

How much context should Agent B see from Agent A's work?

Share observations and high-level decisions, not internal reasoning or debug logs. Tag observations: "prospect_data", "company_intel", "outreach_attempt". Agents filter by relevance.

Should agents communicate directly or only through shared memory?

Shared memory is more scalable (no point-to-point connections), but direct communication (message queues, APIs) allows real-time notifications. Hybrid: agents log to shared memory (durable), and also push urgent alerts via message queue (fast notification).

How do I handle private/confidential information in shared memory?

Use access control: tag facts by sensitivity level (public, internal, confidential). Agents can only read facts they're authorized for. Encrypt sensitive facts at rest.

How do I measure whether multi-agent learning is working?

Track: (1) task completion time (declining = agents learning to coordinate better), (2) rework rate (declining = agents learning from each other's mistakes), (3) escalation rate (declining = agents handling more independently).

Further Reading