Enterprise Agents¶
Build multi-tenant agents for production environments.
Pattern Overview¶
An enterprise agent: - Serves multiple customers with isolated data - Handles policy updates and conflict resolution - Scales horizontally - Maintains audit trails
Multi-Tenant Agent¶
from graphmem import GraphMem, MemoryConfig
class MultiTenantAgent:
"""Agent that serves multiple customers with isolated memories."""
def __init__(self, base_config: dict):
self.base_config = base_config
self.memories = {} # tenant_id -> GraphMem
def get_memory(self, tenant_id: str) -> GraphMem:
"""Get or create memory for a tenant."""
if tenant_id not in self.memories:
config = MemoryConfig(
**self.base_config,
user_id=tenant_id,
memory_id=f"tenant_{tenant_id}",
turso_db_path=f"tenants/{tenant_id}/memory.db",
)
self.memories[tenant_id] = GraphMem(config, memory_id=f"tenant_{tenant_id}", user_id=tenant_id)
return self.memories[tenant_id]
def ingest(self, tenant_id: str, content: str):
"""Ingest content for a specific tenant."""
memory = self.get_memory(tenant_id)
return memory.ingest(content)
def query(self, tenant_id: str, question: str) -> str:
"""Query a specific tenant's memory."""
memory = self.get_memory(tenant_id)
response = memory.query(question)
return response.answer
# Usage with OpenAI
base_config = {
"llm_provider": "openai",
"llm_api_key": "sk-...",
"llm_model": "gpt-4o-mini",
"embedding_provider": "openai",
"embedding_api_key": "sk-...",
"embedding_model": "text-embedding-3-small",
}
# Or with OpenRouter (custom base URL)
base_config_openrouter = {
"llm_provider": "openai_compatible",
"llm_api_key": "sk-or-v1-...",
"llm_api_base": "https://openrouter.ai/api/v1", # Custom base URL
"llm_model": "google/gemini-2.0-flash-001",
"embedding_provider": "openai_compatible",
"embedding_api_key": "sk-or-v1-...",
"embedding_api_base": "https://openrouter.ai/api/v1", # Custom base URL
"embedding_model": "openai/text-embedding-3-small",
}
# Or with Azure OpenAI
base_config_azure = {
"llm_provider": "azure_openai",
"llm_api_key": "your-azure-key",
"llm_api_base": "https://your-resource.openai.azure.com/", # Azure endpoint
"llm_model": "gpt-4",
"azure_deployment": "gpt-4",
"azure_api_version": "2024-02-15-preview",
"embedding_provider": "azure_openai",
"embedding_api_key": "your-azure-key",
"embedding_api_base": "https://your-resource.openai.azure.com/", # Azure endpoint
"embedding_model": "text-embedding-ada-002",
"azure_embedding_deployment": "text-embedding-ada-002",
}
agent = MultiTenantAgent(base_config)
# Each tenant's data is completely isolated
agent.ingest("acme", "Acme's product costs $99.")
agent.ingest("globex", "Globex's product costs $149.")
print(agent.query("acme", "What's the product price?")) # → "$99"
print(agent.query("globex", "What's the product price?")) # → "$149"
Customer Support Agent¶
from graphmem import GraphMem, MemoryConfig, MemoryImportance
class SupportAgent:
"""Agent for customer support with policy updates."""
def __init__(self, company_id: str):
self.config = MemoryConfig(
llm_provider="openai",
llm_api_key="sk-...",
llm_model="gpt-4o",
embedding_provider="openai",
embedding_api_key="sk-...",
embedding_model="text-embedding-3-small",
turso_db_path=f"support/{company_id}.db",
evolution_enabled=True,
decay_enabled=True,
)
self.memory = GraphMem(self.config, memory_id=f"support_{company_id}", user_id=company_id)
def update_policy(self, content: str, effective_date: str):
"""Update knowledge base with new policy."""
enriched = f"""
POLICY UPDATE (Effective: {effective_date})
{content}
"""
self.memory.ingest(
enriched,
importance=MemoryImportance.CRITICAL,
)
# Resolve conflicts with old policies
self.memory.evolve()
def answer_ticket(self, ticket: str) -> dict:
"""Answer a support ticket."""
response = self.memory.query(ticket)
return {
"answer": response.answer,
"confidence": response.confidence,
"needs_escalation": response.confidence < 0.6,
}
# Usage
agent = SupportAgent("acme_corp")
# Initial policy
agent.update_policy(
"Returns accepted within 30 days. Refunds in 5-7 days.",
"2023-01-01"
)
# Policy update - old one automatically superseded
agent.update_policy(
"Returns accepted within 60 days. Refunds in 2-3 days.",
"2024-01-01"
)
# Agent uses current policy
result = agent.answer_ticket("What's your return policy?")
# → "Returns within 60 days, refunds in 2-3 days"
Enterprise Knowledge Base¶
from graphmem import GraphMem, MemoryConfig, MemoryImportance
from datetime import datetime
class EnterpriseKB:
"""Enterprise knowledge base with audit trail."""
def __init__(self, org_id: str):
self.config = MemoryConfig(
llm_provider="azure_openai",
llm_api_key="...",
llm_api_base="https://your-resource.openai.azure.com/", # Azure endpoint
azure_deployment="gpt-4",
llm_model="gpt-4",
azure_api_version="2024-02-15-preview",
embedding_provider="azure_openai",
embedding_api_key="...",
embedding_api_base="https://your-resource.openai.azure.com/", # Azure endpoint
azure_embedding_deployment="text-embedding-ada-002",
embedding_model="text-embedding-ada-002",
neo4j_uri="neo4j+s://...",
neo4j_password="...",
redis_url="redis://...",
user_id=org_id,
evolution_enabled=True,
)
self.memory = GraphMem(self.config, memory_id=f"org_{org_id}", user_id=org_id)
self.org_id = org_id
def add_policy(self, name: str, content: str, effective: str):
"""Add or update a policy document."""
enriched = f"""
POLICY: {name}
Effective Date: {effective}
Last Updated: {datetime.now().isoformat()}
{content}
"""
self.memory.ingest(
enriched,
metadata={"type": "policy", "name": name},
importance=MemoryImportance.CRITICAL,
)
self.memory.evolve()
def add_procedure(self, name: str, steps: list[str]):
"""Add a procedure manual."""
content = f"PROCEDURE: {name}\n\n"
content += "\n".join([f"{i+1}. {s}" for i, s in enumerate(steps)])
self.memory.ingest(
content,
metadata={"type": "procedure", "name": name},
importance=MemoryImportance.HIGH,
)
def add_employee(self, data: dict):
"""Add employee information."""
content = f"""
EMPLOYEE: {data['name']}
Title: {data['title']}
Department: {data['department']}
Email: {data['email']}
Reports to: {data.get('manager', 'N/A')}
Start date: {data.get('start_date', 'N/A')}
"""
self.memory.ingest(
content,
metadata={"type": "employee", **data},
importance=MemoryImportance.MEDIUM,
)
def answer(self, question: str, department: str = None) -> dict:
"""Answer a question from the knowledge base."""
if department:
question = f"[{department}] {question}"
response = self.memory.query(question)
return {
"answer": response.answer,
"confidence": response.confidence,
"sources": [n.name for n in response.nodes[:3]],
"escalate": response.confidence < 0.5,
}
def bulk_import(self, documents: list[dict]):
"""Bulk import documents."""
self.memory.ingest_batch(
documents,
max_workers=20,
aggressive=True,
)
self.memory.evolve()
FastAPI Production Service¶
from fastapi import FastAPI, HTTPException, Header, Depends
from pydantic import BaseModel
import os
app = FastAPI(title="Enterprise Memory Service")
# Configuration from environment
config = MemoryConfig(
llm_provider="azure_openai",
llm_api_key=os.getenv("AZURE_OPENAI_KEY"),
llm_api_base=os.getenv("AZURE_ENDPOINT"), # e.g., "https://your-resource.openai.azure.com/"
azure_deployment=os.getenv("AZURE_DEPLOYMENT"),
llm_model="gpt-4",
azure_api_version="2024-02-15-preview",
embedding_provider="azure_openai",
embedding_api_key=os.getenv("AZURE_OPENAI_KEY"),
embedding_api_base=os.getenv("AZURE_ENDPOINT"), # Same endpoint for embeddings
azure_embedding_deployment=os.getenv("AZURE_EMBED_DEPLOYMENT"),
embedding_model="text-embedding-ada-002",
neo4j_uri=os.getenv("NEO4J_URI"),
neo4j_password=os.getenv("NEO4J_PASSWORD"),
redis_url=os.getenv("REDIS_URL"),
)
# Tenant memories
memories = {}
def get_memory(tenant_id: str = Header(..., alias="X-Tenant-ID")) -> GraphMem:
if tenant_id not in memories:
memories[tenant_id] = GraphMem(
config,
user_id=tenant_id,
memory_id="kb",
)
return memories[tenant_id]
class IngestRequest(BaseModel):
content: str
importance: str = "MEDIUM"
class QueryRequest(BaseModel):
question: str
@app.post("/ingest")
async def ingest(
request: IngestRequest,
memory: GraphMem = Depends(get_memory),
):
importance_map = {
"CRITICAL": MemoryImportance.CRITICAL,
"HIGH": MemoryImportance.HIGH,
"MEDIUM": MemoryImportance.MEDIUM,
"LOW": MemoryImportance.LOW,
}
result = memory.ingest(
request.content,
importance=importance_map.get(request.importance),
)
return {"status": "success", "entities": result["entities"]}
@app.post("/query")
async def query(
request: QueryRequest,
memory: GraphMem = Depends(get_memory),
):
response = memory.query(request.question)
return {
"answer": response.answer,
"confidence": response.confidence,
}
@app.post("/evolve")
async def evolve(memory: GraphMem = Depends(get_memory)):
events = memory.evolve()
return {"events": len(events)}
@app.get("/stats")
async def stats(memory: GraphMem = Depends(get_memory)):
return memory.get_stats()
Best Practices¶
- Isolate by tenant - Use
user_idfor complete separation - Use enterprise storage - Neo4j + Redis for scale
- Mark policy as critical - Should never decay
- Evolve after updates - Resolve conflicts immediately
- Audit everything - Include timestamps in metadata
- Use Azure OpenAI - Enterprise compliance
- Set escalation thresholds - Route low-confidence answers to humans