Production Deployment Guide
Complete guide to deploying Agent Patterns in production, covering API design, scaling, monitoring, cost optimization, and security.
Overview
Deploying AI agents to production requires careful consideration of:
API design and wrapping
Scalability and performance
Cost management
Monitoring and observability
Security and safety
Reliability and error handling
API Wrapping
FastAPI Wrapper
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agent_patterns.patterns import ReActAgent
import logging
app = FastAPI(title="Agent API")
logger = logging.getLogger(__name__)
# Request/Response models
class AgentRequest(BaseModel):
task: str
pattern: str = "react"
max_iterations: int = 5
class AgentResponse(BaseModel):
result: str
success: bool
error: str = None
metadata: dict = {}
# Initialize agents (singleton pattern)
def get_agent(pattern: str):
"""Get or create agent instance."""
llm_configs = {
"thinking": {
"provider": "openai",
"model": "gpt-4",
"temperature": 0.7
}
}
if pattern == "react":
return ReActAgent(llm_configs=llm_configs, tools=get_tools())
elif pattern == "reflection":
return ReflectionAgent(llm_configs=llm_configs)
else:
raise ValueError(f"Unknown pattern: {pattern}")
@app.post("/agent/run", response_model=AgentResponse)
async def run_agent(request: AgentRequest):
"""Run agent on task."""
try:
agent = get_agent(request.pattern)
result = agent.run(request.task)
return AgentResponse(
result=result,
success=True,
metadata={
"pattern": request.pattern,
"task_length": len(request.task)
}
)
except Exception as e:
logger.error(f"Agent execution failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Flask Wrapper
from flask import Flask, request, jsonify
from agent_patterns.patterns import ReflectionAgent
import logging
app = Flask(__name__)
logger = logging.getLogger(__name__)
@app.route('/agent/run', methods=['POST'])
def run_agent():
"""Run agent endpoint."""
try:
data = request.get_json()
task = data.get('task')
if not task:
return jsonify({"error": "Task required"}), 400
agent = get_agent()
result = agent.run(task)
return jsonify({
"result": result,
"success": True
})
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
return jsonify({"error": str(e)}), 500
@app.route('/health')
def health():
return jsonify({"status": "healthy"})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)
Containerization
Dockerfile
FROM python:3.10-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Install agent-patterns
RUN pip install -e .
# Expose port
EXPOSE 8000
# Run application
CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]
Docker Compose
version: '3.8'
services:
agent-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- LOG_LEVEL=INFO
volumes:
- ./logs:/app/logs
restart: unless-stopped
redis:
image: redis:alpine
ports:
- "6379:6379"
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
Scaling Strategies
Horizontal Scaling
# Use load balancer (nginx, AWS ALB, etc.)
# Multiple instances of your API
# docker-compose.yml
services:
agent-api:
build: .
deploy:
replicas: 3 # Run 3 instances
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- agent-api
Async Processing with Celery
from celery import Celery
from agent_patterns.patterns import ReActAgent
# Configure Celery
celery = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
@celery.task
def run_agent_async(task, pattern="react"):
"""Run agent asynchronously."""
try:
agent = get_agent(pattern)
result = agent.run(task)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e)}
# API endpoint
@app.post("/agent/run/async")
async def run_agent_async_endpoint(request: AgentRequest):
"""Queue agent task."""
task = run_agent_async.delay(request.task, request.pattern)
return {"task_id": task.id, "status": "queued"}
@app.get("/agent/result/{task_id}")
async def get_result(task_id: str):
"""Get async task result."""
task = run_agent_async.AsyncResult(task_id)
if task.ready():
return {"status": "completed", "result": task.result}
else:
return {"status": "processing"}
Caching
import redis
import hashlib
import json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_cache_key(task: str, pattern: str) -> str:
"""Generate cache key."""
content = f"{pattern}:{task}"
return hashlib.sha256(content.encode()).hexdigest()
def run_with_cache(task: str, pattern: str, ttl: int = 3600):
"""Run agent with caching."""
cache_key = get_cache_key(task, pattern)
# Check cache
cached = redis_client.get(cache_key)
if cached:
logger.info(f"Cache hit for task: {task[:50]}...")
return json.loads(cached)
# Execute
agent = get_agent(pattern)
result = agent.run(task)
# Cache result
redis_client.setex(
cache_key,
ttl,
json.dumps({"result": result, "cached_at": time.time()})
)
return {"result": result, "cached": False}
Monitoring and Observability
Prometheus Metrics
from prometheus_client import Counter, Histogram, Gauge
import time
# Define metrics
request_count = Counter(
'agent_requests_total',
'Total agent requests',
['pattern', 'status']
)
request_duration = Histogram(
'agent_request_duration_seconds',
'Agent request duration',
['pattern']
)
active_requests = Gauge(
'agent_active_requests',
'Active agent requests',
['pattern']
)
# Instrumented endpoint
@app.post("/agent/run")
async def run_agent(request: AgentRequest):
pattern = request.pattern
active_requests.labels(pattern=pattern).inc()
start_time = time.time()
try:
agent = get_agent(pattern)
result = agent.run(request.task)
request_count.labels(pattern=pattern, status='success').inc()
return AgentResponse(result=result, success=True)
except Exception as e:
request_count.labels(pattern=pattern, status='error').inc()
raise
finally:
duration = time.time() - start_time
request_duration.labels(pattern=pattern).observe(duration)
active_requests.labels(pattern=pattern).dec()
Structured Logging
import logging
import json
class JSONFormatter(logging.Formatter):
"""JSON log formatter."""
def format(self, record):
log_data = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if hasattr(record, 'extra'):
log_data.update(record.extra)
if record.exc_info:
log_data['exception'] = self.formatException(record.exc_info)
return json.dumps(log_data)
# Configure logging
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Usage
logger.info(
"Agent execution completed",
extra={
"pattern": "react",
"task_id": "12345",
"duration_ms": 1234
}
)
Distributed Tracing
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
# Setup tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
jaeger_exporter = JaegerExporter(
agent_host_name='localhost',
agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# Traced agent execution
@app.post("/agent/run")
async def run_agent(request: AgentRequest):
with tracer.start_as_current_span("agent_execution") as span:
span.set_attribute("pattern", request.pattern)
span.set_attribute("task_length", len(request.task))
try:
agent = get_agent(request.pattern)
result = agent.run(request.task)
span.set_attribute("success", True)
return AgentResponse(result=result, success=True)
except Exception as e:
span.set_attribute("success", False)
span.record_exception(e)
raise
Cost Optimization
Model Selection Strategy
def get_optimized_config(task_complexity: str):
"""Get cost-optimized configuration."""
if task_complexity == "simple":
return {
"thinking": {
"provider": "openai",
"model": "gpt-3.5-turbo", # Cheaper
"temperature": 0.7
}
}
elif task_complexity == "complex":
return {
"thinking": {
"provider": "openai",
"model": "gpt-4", # Better quality
"temperature": 0.7
}
}
Rate Limiting
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/agent/run")
@limiter.limit("10/minute")
async def run_agent(request: Request, agent_request: AgentRequest):
"""Rate limited endpoint."""
# Implementation
Usage Tracking
import sqlite3
def track_usage(user_id: str, pattern: str, tokens_used: int, cost: float):
"""Track API usage."""
conn = sqlite3.connect('usage.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO usage (user_id, pattern, tokens_used, cost, timestamp)
VALUES (?, ?, ?, ?, datetime('now'))
''', (user_id, pattern, tokens_used, cost))
conn.commit()
conn.close()
# In endpoint
@app.post("/agent/run")
async def run_agent(request: AgentRequest, user_id: str):
result = agent.run(request.task)
# Track usage
tokens_used = estimate_tokens(request.task, result)
cost = calculate_cost(tokens_used, model="gpt-4")
track_usage(user_id, request.pattern, tokens_used, cost)
return AgentResponse(result=result, success=True)
Security
API Authentication
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt
security = HTTPBearer()
SECRET_KEY = os.getenv("JWT_SECRET_KEY")
def verify_token(credentials: HTTPAuthorizationCredentials):
"""Verify JWT token."""
try:
payload = jwt.decode(
credentials.credentials,
SECRET_KEY,
algorithms=["HS256"]
)
return payload
except jwt.JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
@app.post("/agent/run")
async def run_agent(
request: AgentRequest,
credentials: HTTPAuthorizationCredentials = Depends(security)
):
"""Authenticated endpoint."""
user = verify_token(credentials)
# Run agent
agent = get_agent(request.pattern)
result = agent.run(request.task)
return AgentResponse(result=result, success=True)
Input Sanitization
import re
def sanitize_input(task: str, max_length: int = 10000) -> str:
"""Sanitize user input."""
# Length check
if len(task) > max_length:
raise ValueError(f"Input too long (max {max_length})")
# Remove potentially harmful patterns
forbidden = ["<script>", "eval(", "exec(", "system("]
for pattern in forbidden:
if pattern in task.lower():
raise ValueError(f"Forbidden pattern: {pattern}")
# Remove excessive whitespace
task = re.sub(r'\s+', ' ', task)
return task.strip()
Rate Limiting per User
from collections import defaultdict
import time
class UserRateLimiter:
"""Per-user rate limiter."""
def __init__(self, max_requests: int = 100, window_seconds: int = 3600):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
def is_allowed(self, user_id: str) -> bool:
"""Check if user is within rate limit."""
now = time.time()
window_start = now - self.window_seconds
# Remove old requests
self.requests[user_id] = [
t for t in self.requests[user_id]
if t > window_start
]
# Check limit
if len(self.requests[user_id]) >= self.max_requests:
return False
# Add current request
self.requests[user_id].append(now)
return True
rate_limiter = UserRateLimiter(max_requests=100, window_seconds=3600)
@app.post("/agent/run")
async def run_agent(request: AgentRequest, user_id: str):
if not rate_limiter.is_allowed(user_id):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Process request
Deployment Environments
Development
# config/dev.py
ENVIRONMENT = "development"
LOG_LEVEL = "DEBUG"
CACHE_TTL = 60
LLM_CONFIGS = {
"thinking": {
"provider": "openai",
"model": "gpt-3.5-turbo", # Cheaper for dev
"temperature": 0.7
}
}
Staging
# config/staging.py
ENVIRONMENT = "staging"
LOG_LEVEL = "INFO"
CACHE_TTL = 300
LLM_CONFIGS = {
"thinking": {
"provider": "openai",
"model": "gpt-4", # Production model
"temperature": 0.7
}
}
Production
# config/production.py
ENVIRONMENT = "production"
LOG_LEVEL = "WARNING"
CACHE_TTL = 3600
LLM_CONFIGS = {
"thinking": {
"provider": "openai",
"model": "gpt-4-turbo",
"temperature": 0.7
}
}
# Additional production settings
ENABLE_MONITORING = True
ENABLE_TRACING = True
ENABLE_CACHING = True
Best Practices
1. Use Environment Variables
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("OPENAI_API_KEY")
ENVIRONMENT = os.getenv("ENVIRONMENT", "development")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
2. Implement Health Checks
@app.get("/health")
async def health_check():
"""Comprehensive health check."""
checks = {
"api": "healthy",
"llm": check_llm_connection(),
"cache": check_redis_connection(),
"database": check_db_connection()
}
all_healthy = all(v == "healthy" for v in checks.values())
status_code = 200 if all_healthy else 503
return JSONResponse(checks, status_code=status_code)
3. Graceful Shutdown
import signal
import sys
def signal_handler(sig, frame):
"""Handle shutdown signals."""
logger.info("Shutting down gracefully...")
# Cleanup
redis_client.close()
# Close other connections
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
4. Use Circuit Breakers
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=60)
def call_llm(agent, task):
"""Call LLM with circuit breaker."""
return agent.run(task)
5. Monitor Costs
def estimate_cost(input_text: str, output_text: str, model: str) -> float:
"""Estimate API call cost."""
PRICING = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-3.5-turbo": {"input": 0.0015, "output": 0.002}
}
input_tokens = len(input_text.split()) * 1.3
output_tokens = len(output_text.split()) * 1.3
pricing = PRICING.get(model, PRICING["gpt-4"])
cost = (
(input_tokens / 1000) * pricing["input"] +
(output_tokens / 1000) * pricing["output"]
)
return cost
Next Steps
Review Error Handling for production robustness
See Testing for deployment testing
Explore Best Practices for optimization
Reference
Deployment Checklist
✅ API authentication implemented
✅ Rate limiting configured
✅ Input validation and sanitization
✅ Error handling and logging
✅ Monitoring and metrics
✅ Cost tracking
✅ Caching strategy
✅ Horizontal scaling ready
✅ Health checks implemented
✅ Security measures in place
✅ Documentation updated
✅ Load tested