llm-hub/services/agent-core/main.py

358 lines
11 KiB
Python

"""
Agentic AI Core - Multi-Reasoning Engine
Supports: ReAct, Plan-and-Execute, Reflexion
"""
import os
import json
from typing import List, Dict, Any, Literal, Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from datetime import datetime
import httpx
import redis
app = FastAPI(
title="Agentic AI Core",
version="2.0.0",
description="Multi-reasoning agent platform with memory and MCP integration"
)
# Configuration
LLM_API_BASE = os.getenv("LLM_API_BASE", "http://litellm:4000/v1")
LLM_API_KEY = os.getenv("LLM_API_KEY", "sk-agent")
DEFAULT_REASONING = os.getenv("DEFAULT_REASONING_MODE", "auto")
# Redis for short-term memory
try:
redis_client = redis.from_url(
os.getenv("REDIS_URL", "redis://redis:6379"),
decode_responses=True
)
except:
redis_client = None
# ==========================================
# DATA MODELS
# ==========================================
class AgentRequest(BaseModel):
message: str
session_id: str = Field(default="default", description="Conversation thread ID")
reasoning_mode: Literal["react", "plan_execute", "reflexion", "auto"] = DEFAULT_REASONING
context_files: Optional[List[str]] = Field(default_factory=list)
enable_memory: bool = True
max_iterations: int = 10
class AgentStep(BaseModel):
step_number: int
type: Literal["thought", "action", "observation", "reflection", "plan"]
content: str
timestamp: datetime = Field(default_factory=datetime.now)
class AgentResponse(BaseModel):
response: str
reasoning_mode: str
session_id: str
steps: List[AgentStep] = Field(default_factory=list)
metadata: Dict[str, Any] = Field(default_factory=dict)
# ==========================================
# UTILITY FUNCTIONS
# ==========================================
async def call_llm(messages: List[Dict], model: str = "auto", tools: Optional[List] = None) -> Dict:
"""Call LLM through LiteLLM gateway"""
async with httpx.AsyncClient() as client:
payload = {
"model": model,
"messages": messages,
"temperature": 0.7,
"max_tokens": 4000
}
if tools:
payload["tools"] = tools
response = await client.post(
f"{LLM_API_BASE}/chat/completions",
headers={"Authorization": f"Bearer {LLM_API_KEY}"},
json=payload,
timeout=60.0
)
return response.json()
def determine_reasoning_mode(message: str, requested: str) -> str:
"""Auto-select reasoning mode based on task complexity"""
if requested != "auto":
return requested
# Complexity indicators
complexity_markers = [
"plan", "design", "architecture", "steps", "implement",
"build", "create", "project", "complex", "multi-step"
]
msg_lower = message.lower()
score = sum(1 for marker in complexity_markers if marker in msg_lower)
if score >= 3 or len(message) > 500:
return "plan_execute"
elif "review" in msg_lower or "check" in msg_lower or "verify" in msg_lower:
return "reflexion"
else:
return "react"
# ==========================================
# REASONING ENGINES
# ==========================================
class ReActEngine:
"""ReAct: Reasoning + Acting in interleaved steps"""
async def run(self, message: str, session_id: str) -> Dict:
steps = []
# Initial thought
messages = [
{"role": "system", "content": "You are a ReAct agent. Think step by step and act."},
{"role": "user", "content": message}
]
response = await call_llm(messages, model="fast-tier")
steps.append(AgentStep(
step_number=1,
type="thought",
content="Initial analysis and reasoning"
))
content = response["choices"][0]["message"]["content"]
return {
"response": content,
"steps": steps,
"model_used": "fast-tier"
}
class PlanAndExecuteEngine:
"""Plan first, then execute step by step"""
async def run(self, message: str, session_id: str) -> Dict:
steps = []
# Planning phase
plan_messages = [
{"role": "system", "content": "Create a step-by-step plan to accomplish the task."},
{"role": "user", "content": f"Create a detailed plan for: {message}"}
]
plan_response = await call_llm(plan_messages, model="volume-tier")
plan = plan_response["choices"][0]["message"]["content"]
steps.append(AgentStep(
step_number=1,
type="plan",
content=plan
))
# Execution phase
exec_messages = [
{"role": "system", "content": "Execute the task following the provided plan."},
{"role": "user", "content": f"Task: {message}\n\nPlan: {plan}\n\nExecute this plan:"}
]
exec_response = await call_llm(exec_messages, model="reasoning-tier")
result = exec_response["choices"][0]["message"]["content"]
steps.append(AgentStep(
step_number=2,
type="action",
content="Execution completed"
))
return {
"response": result,
"steps": steps,
"model_used": "reasoning-tier",
"plan": plan
}
class ReflexionEngine:
"""Execute with self-reflection and correction"""
async def run(self, message: str, session_id: str, max_iterations: int = 2) -> Dict:
steps = []
# Initial execution
messages = [
{"role": "system", "content": "Solve the problem carefully."},
{"role": "user", "content": message}
]
response = await call_llm(messages, model="quality-tier")
answer = response["choices"][0]["message"]["content"]
steps.append(AgentStep(
step_number=1,
type="action",
content="Initial solution generated"
))
# Reflection phase
for i in range(max_iterations):
reflect_messages = [
{"role": "system", "content": "Critically evaluate the solution for errors or improvements."},
{"role": "user", "content": f"Problem: {message}\n\nProposed Solution: {answer}\n\nIdentify any issues or improvements:"}
]
reflect_response = await call_llm(reflect_messages, model="claude-haiku")
reflection = reflect_response["choices"][0]["message"]["content"]
if "correct" in reflection.lower() and "no issues" in reflection.lower():
break
steps.append(AgentStep(
step_number=2+i,
type="reflection",
content=reflection
))
# Improve based on reflection
improve_messages = [
{"role": "system", "content": "Improve the solution based on the critique."},
{"role": "user", "content": f"Original: {answer}\n\nIssues found: {reflection}\n\nProvide improved solution:"}
]
improve_response = await call_llm(improve_messages, model="quality-tier")
answer = improve_response["choices"][0]["message"]["content"]
return {
"response": answer,
"steps": steps,
"model_used": "quality-tier",
"iterations": len(steps)
}
# ==========================================
# API ENDPOINTS
# ==========================================
@app.post("/v1/chat/completions", response_model=AgentResponse)
async def agent_endpoint(request: AgentRequest):
"""
Main agent endpoint with multiple reasoning strategies:
- react: Fast iterative reasoning (good for simple tasks)
- plan_execute: Plan then execute (good for complex tasks)
- reflexion: Self-correcting (good for accuracy-critical tasks)
- auto: Automatically select based on task complexity
"""
# Determine reasoning mode
mode = determine_reasoning_mode(request.message, request.reasoning_mode)
# Store message in memory if enabled
if request.enable_memory and redis_client:
key = f"session:{request.session_id}:history"
redis_client.lpush(key, request.message)
redis_client.ltrim(key, 0, 99) # Keep last 100 messages
try:
# Route to appropriate reasoning engine
if mode == "react":
result = await ReActEngine().run(request.message, request.session_id)
elif mode == "plan_execute":
result = await PlanAndExecuteEngine().run(request.message, request.session_id)
elif mode == "reflexion":
result = await ReflexionEngine().run(
request.message,
request.session_id,
max_iterations=2
)
else:
# Default fallback
result = await ReActEngine().run(request.message, request.session_id)
return AgentResponse(
response=result["response"],
reasoning_mode=mode,
session_id=request.session_id,
steps=result.get("steps", []),
metadata={
"model_used": result.get("model_used", "unknown"),
"auto_selected": request.reasoning_mode == "auto",
"timestamp": datetime.now().isoformat()
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/v1/models")
async def list_models():
"""List available agent models"""
return {
"object": "list",
"data": [
{
"id": "agent/orchestrator",
"object": "model",
"created": 1700000000,
"owned_by": "llm-hub",
"description": "Auto-selecting orchestrator"
},
{
"id": "agent/react",
"object": "model",
"created": 1700000000,
"owned_by": "llm-hub",
"description": "ReAct reasoning - fast iterative"
},
{
"id": "agent/plan-execute",
"object": "model",
"created": 1700000000,
"owned_by": "llm-hub",
"description": "Plan-and-Execute - complex tasks"
},
{
"id": "agent/reflexion",
"object": "model",
"created": 1700000000,
"owned_by": "llm-hub",
"description": "Reflexion - self-correcting with verification"
}
]
}
@app.get("/health")
async def health():
"""Health check endpoint"""
redis_status = "connected" if redis_client and redis_client.ping() else "disconnected"
return {
"status": "healthy",
"version": "2.0.0",
"capabilities": ["react", "plan_execute", "reflexion", "mcp", "memory"],
"default_mode": DEFAULT_REASONING,
"redis": redis_status,
"timestamp": datetime.now().isoformat()
}
@app.get("/sessions/{session_id}/history")
async def get_session_history(session_id: str, limit: int = 10):
"""Retrieve conversation history for a session"""
if not redis_client:
return {"error": "Redis not available"}
key = f"session:{session_id}:history"
history = redis_client.lrange(key, 0, limit - 1)
return {
"session_id": session_id,
"history": history,
"count": len(history)
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)