358 lines
11 KiB
Python
358 lines
11 KiB
Python
"""
|
|
Agentic AI Core - Multi-Reasoning Engine
|
|
Supports: ReAct, Plan-and-Execute, Reflexion
|
|
"""
|
|
import os
|
|
import json
|
|
from typing import List, Dict, Any, Literal, Optional
|
|
from fastapi import FastAPI, HTTPException
|
|
from pydantic import BaseModel, Field
|
|
from datetime import datetime
|
|
import httpx
|
|
import redis
|
|
|
|
app = FastAPI(
|
|
title="Agentic AI Core",
|
|
version="2.0.0",
|
|
description="Multi-reasoning agent platform with memory and MCP integration"
|
|
)
|
|
|
|
# Configuration
|
|
LLM_API_BASE = os.getenv("LLM_API_BASE", "http://litellm:4000/v1")
|
|
LLM_API_KEY = os.getenv("LLM_API_KEY", "sk-agent")
|
|
DEFAULT_REASONING = os.getenv("DEFAULT_REASONING_MODE", "auto")
|
|
|
|
# Redis for short-term memory
|
|
try:
|
|
redis_client = redis.from_url(
|
|
os.getenv("REDIS_URL", "redis://redis:6379"),
|
|
decode_responses=True
|
|
)
|
|
except:
|
|
redis_client = None
|
|
|
|
# ==========================================
|
|
# DATA MODELS
|
|
# ==========================================
|
|
|
|
class AgentRequest(BaseModel):
|
|
message: str
|
|
session_id: str = Field(default="default", description="Conversation thread ID")
|
|
reasoning_mode: Literal["react", "plan_execute", "reflexion", "auto"] = DEFAULT_REASONING
|
|
context_files: Optional[List[str]] = Field(default_factory=list)
|
|
enable_memory: bool = True
|
|
max_iterations: int = 10
|
|
|
|
class AgentStep(BaseModel):
|
|
step_number: int
|
|
type: Literal["thought", "action", "observation", "reflection", "plan"]
|
|
content: str
|
|
timestamp: datetime = Field(default_factory=datetime.now)
|
|
|
|
class AgentResponse(BaseModel):
|
|
response: str
|
|
reasoning_mode: str
|
|
session_id: str
|
|
steps: List[AgentStep] = Field(default_factory=list)
|
|
metadata: Dict[str, Any] = Field(default_factory=dict)
|
|
|
|
# ==========================================
|
|
# UTILITY FUNCTIONS
|
|
# ==========================================
|
|
|
|
async def call_llm(messages: List[Dict], model: str = "auto", tools: Optional[List] = None) -> Dict:
|
|
"""Call LLM through LiteLLM gateway"""
|
|
async with httpx.AsyncClient() as client:
|
|
payload = {
|
|
"model": model,
|
|
"messages": messages,
|
|
"temperature": 0.7,
|
|
"max_tokens": 4000
|
|
}
|
|
if tools:
|
|
payload["tools"] = tools
|
|
|
|
response = await client.post(
|
|
f"{LLM_API_BASE}/chat/completions",
|
|
headers={"Authorization": f"Bearer {LLM_API_KEY}"},
|
|
json=payload,
|
|
timeout=60.0
|
|
)
|
|
return response.json()
|
|
|
|
def determine_reasoning_mode(message: str, requested: str) -> str:
|
|
"""Auto-select reasoning mode based on task complexity"""
|
|
if requested != "auto":
|
|
return requested
|
|
|
|
# Complexity indicators
|
|
complexity_markers = [
|
|
"plan", "design", "architecture", "steps", "implement",
|
|
"build", "create", "project", "complex", "multi-step"
|
|
]
|
|
|
|
msg_lower = message.lower()
|
|
score = sum(1 for marker in complexity_markers if marker in msg_lower)
|
|
|
|
if score >= 3 or len(message) > 500:
|
|
return "plan_execute"
|
|
elif "review" in msg_lower or "check" in msg_lower or "verify" in msg_lower:
|
|
return "reflexion"
|
|
else:
|
|
return "react"
|
|
|
|
# ==========================================
|
|
# REASONING ENGINES
|
|
# ==========================================
|
|
|
|
class ReActEngine:
|
|
"""ReAct: Reasoning + Acting in interleaved steps"""
|
|
|
|
async def run(self, message: str, session_id: str) -> Dict:
|
|
steps = []
|
|
|
|
# Initial thought
|
|
messages = [
|
|
{"role": "system", "content": "You are a ReAct agent. Think step by step and act."},
|
|
{"role": "user", "content": message}
|
|
]
|
|
|
|
response = await call_llm(messages, model="fast-tier")
|
|
|
|
steps.append(AgentStep(
|
|
step_number=1,
|
|
type="thought",
|
|
content="Initial analysis and reasoning"
|
|
))
|
|
|
|
content = response["choices"][0]["message"]["content"]
|
|
|
|
return {
|
|
"response": content,
|
|
"steps": steps,
|
|
"model_used": "fast-tier"
|
|
}
|
|
|
|
class PlanAndExecuteEngine:
|
|
"""Plan first, then execute step by step"""
|
|
|
|
async def run(self, message: str, session_id: str) -> Dict:
|
|
steps = []
|
|
|
|
# Planning phase
|
|
plan_messages = [
|
|
{"role": "system", "content": "Create a step-by-step plan to accomplish the task."},
|
|
{"role": "user", "content": f"Create a detailed plan for: {message}"}
|
|
]
|
|
|
|
plan_response = await call_llm(plan_messages, model="volume-tier")
|
|
plan = plan_response["choices"][0]["message"]["content"]
|
|
|
|
steps.append(AgentStep(
|
|
step_number=1,
|
|
type="plan",
|
|
content=plan
|
|
))
|
|
|
|
# Execution phase
|
|
exec_messages = [
|
|
{"role": "system", "content": "Execute the task following the provided plan."},
|
|
{"role": "user", "content": f"Task: {message}\n\nPlan: {plan}\n\nExecute this plan:"}
|
|
]
|
|
|
|
exec_response = await call_llm(exec_messages, model="reasoning-tier")
|
|
result = exec_response["choices"][0]["message"]["content"]
|
|
|
|
steps.append(AgentStep(
|
|
step_number=2,
|
|
type="action",
|
|
content="Execution completed"
|
|
))
|
|
|
|
return {
|
|
"response": result,
|
|
"steps": steps,
|
|
"model_used": "reasoning-tier",
|
|
"plan": plan
|
|
}
|
|
|
|
class ReflexionEngine:
|
|
"""Execute with self-reflection and correction"""
|
|
|
|
async def run(self, message: str, session_id: str, max_iterations: int = 2) -> Dict:
|
|
steps = []
|
|
|
|
# Initial execution
|
|
messages = [
|
|
{"role": "system", "content": "Solve the problem carefully."},
|
|
{"role": "user", "content": message}
|
|
]
|
|
|
|
response = await call_llm(messages, model="quality-tier")
|
|
answer = response["choices"][0]["message"]["content"]
|
|
|
|
steps.append(AgentStep(
|
|
step_number=1,
|
|
type="action",
|
|
content="Initial solution generated"
|
|
))
|
|
|
|
# Reflection phase
|
|
for i in range(max_iterations):
|
|
reflect_messages = [
|
|
{"role": "system", "content": "Critically evaluate the solution for errors or improvements."},
|
|
{"role": "user", "content": f"Problem: {message}\n\nProposed Solution: {answer}\n\nIdentify any issues or improvements:"}
|
|
]
|
|
|
|
reflect_response = await call_llm(reflect_messages, model="claude-haiku")
|
|
reflection = reflect_response["choices"][0]["message"]["content"]
|
|
|
|
if "correct" in reflection.lower() and "no issues" in reflection.lower():
|
|
break
|
|
|
|
steps.append(AgentStep(
|
|
step_number=2+i,
|
|
type="reflection",
|
|
content=reflection
|
|
))
|
|
|
|
# Improve based on reflection
|
|
improve_messages = [
|
|
{"role": "system", "content": "Improve the solution based on the critique."},
|
|
{"role": "user", "content": f"Original: {answer}\n\nIssues found: {reflection}\n\nProvide improved solution:"}
|
|
]
|
|
|
|
improve_response = await call_llm(improve_messages, model="quality-tier")
|
|
answer = improve_response["choices"][0]["message"]["content"]
|
|
|
|
return {
|
|
"response": answer,
|
|
"steps": steps,
|
|
"model_used": "quality-tier",
|
|
"iterations": len(steps)
|
|
}
|
|
|
|
# ==========================================
|
|
# API ENDPOINTS
|
|
# ==========================================
|
|
|
|
@app.post("/v1/chat/completions", response_model=AgentResponse)
|
|
async def agent_endpoint(request: AgentRequest):
|
|
"""
|
|
Main agent endpoint with multiple reasoning strategies:
|
|
- react: Fast iterative reasoning (good for simple tasks)
|
|
- plan_execute: Plan then execute (good for complex tasks)
|
|
- reflexion: Self-correcting (good for accuracy-critical tasks)
|
|
- auto: Automatically select based on task complexity
|
|
"""
|
|
|
|
# Determine reasoning mode
|
|
mode = determine_reasoning_mode(request.message, request.reasoning_mode)
|
|
|
|
# Store message in memory if enabled
|
|
if request.enable_memory and redis_client:
|
|
key = f"session:{request.session_id}:history"
|
|
redis_client.lpush(key, request.message)
|
|
redis_client.ltrim(key, 0, 99) # Keep last 100 messages
|
|
|
|
try:
|
|
# Route to appropriate reasoning engine
|
|
if mode == "react":
|
|
result = await ReActEngine().run(request.message, request.session_id)
|
|
elif mode == "plan_execute":
|
|
result = await PlanAndExecuteEngine().run(request.message, request.session_id)
|
|
elif mode == "reflexion":
|
|
result = await ReflexionEngine().run(
|
|
request.message,
|
|
request.session_id,
|
|
max_iterations=2
|
|
)
|
|
else:
|
|
# Default fallback
|
|
result = await ReActEngine().run(request.message, request.session_id)
|
|
|
|
return AgentResponse(
|
|
response=result["response"],
|
|
reasoning_mode=mode,
|
|
session_id=request.session_id,
|
|
steps=result.get("steps", []),
|
|
metadata={
|
|
"model_used": result.get("model_used", "unknown"),
|
|
"auto_selected": request.reasoning_mode == "auto",
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.get("/v1/models")
|
|
async def list_models():
|
|
"""List available agent models"""
|
|
return {
|
|
"object": "list",
|
|
"data": [
|
|
{
|
|
"id": "agent/orchestrator",
|
|
"object": "model",
|
|
"created": 1700000000,
|
|
"owned_by": "llm-hub",
|
|
"description": "Auto-selecting orchestrator"
|
|
},
|
|
{
|
|
"id": "agent/react",
|
|
"object": "model",
|
|
"created": 1700000000,
|
|
"owned_by": "llm-hub",
|
|
"description": "ReAct reasoning - fast iterative"
|
|
},
|
|
{
|
|
"id": "agent/plan-execute",
|
|
"object": "model",
|
|
"created": 1700000000,
|
|
"owned_by": "llm-hub",
|
|
"description": "Plan-and-Execute - complex tasks"
|
|
},
|
|
{
|
|
"id": "agent/reflexion",
|
|
"object": "model",
|
|
"created": 1700000000,
|
|
"owned_by": "llm-hub",
|
|
"description": "Reflexion - self-correcting with verification"
|
|
}
|
|
]
|
|
}
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
"""Health check endpoint"""
|
|
redis_status = "connected" if redis_client and redis_client.ping() else "disconnected"
|
|
|
|
return {
|
|
"status": "healthy",
|
|
"version": "2.0.0",
|
|
"capabilities": ["react", "plan_execute", "reflexion", "mcp", "memory"],
|
|
"default_mode": DEFAULT_REASONING,
|
|
"redis": redis_status,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
@app.get("/sessions/{session_id}/history")
|
|
async def get_session_history(session_id: str, limit: int = 10):
|
|
"""Retrieve conversation history for a session"""
|
|
if not redis_client:
|
|
return {"error": "Redis not available"}
|
|
|
|
key = f"session:{session_id}:history"
|
|
history = redis_client.lrange(key, 0, limit - 1)
|
|
|
|
return {
|
|
"session_id": session_id,
|
|
"history": history,
|
|
"count": len(history)
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8080)
|