""" Agentic AI Core - Multi-Reasoning Engine Supports: ReAct, Plan-and-Execute, Reflexion """ import os import json from typing import List, Dict, Any, Literal, Optional from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field from datetime import datetime import httpx import redis app = FastAPI( title="Agentic AI Core", version="2.0.0", description="Multi-reasoning agent platform with memory and MCP integration" ) # Configuration LLM_API_BASE = os.getenv("LLM_API_BASE", "http://litellm:4000/v1") LLM_API_KEY = os.getenv("LLM_API_KEY", "sk-agent") DEFAULT_REASONING = os.getenv("DEFAULT_REASONING_MODE", "auto") # Redis for short-term memory try: redis_client = redis.from_url( os.getenv("REDIS_URL", "redis://redis:6379"), decode_responses=True ) except: redis_client = None # ========================================== # DATA MODELS # ========================================== class AgentRequest(BaseModel): message: str session_id: str = Field(default="default", description="Conversation thread ID") reasoning_mode: Literal["react", "plan_execute", "reflexion", "auto"] = DEFAULT_REASONING context_files: Optional[List[str]] = Field(default_factory=list) enable_memory: bool = True max_iterations: int = 10 class AgentStep(BaseModel): step_number: int type: Literal["thought", "action", "observation", "reflection", "plan"] content: str timestamp: datetime = Field(default_factory=datetime.now) class AgentResponse(BaseModel): response: str reasoning_mode: str session_id: str steps: List[AgentStep] = Field(default_factory=list) metadata: Dict[str, Any] = Field(default_factory=dict) # ========================================== # UTILITY FUNCTIONS # ========================================== async def call_llm(messages: List[Dict], model: str = "auto", tools: Optional[List] = None) -> Dict: """Call LLM through LiteLLM gateway""" async with httpx.AsyncClient() as client: payload = { "model": model, "messages": messages, "temperature": 0.7, "max_tokens": 4000 } if tools: payload["tools"] = tools response = await client.post( f"{LLM_API_BASE}/chat/completions", headers={"Authorization": f"Bearer {LLM_API_KEY}"}, json=payload, timeout=60.0 ) return response.json() def determine_reasoning_mode(message: str, requested: str) -> str: """Auto-select reasoning mode based on task complexity""" if requested != "auto": return requested # Complexity indicators complexity_markers = [ "plan", "design", "architecture", "steps", "implement", "build", "create", "project", "complex", "multi-step" ] msg_lower = message.lower() score = sum(1 for marker in complexity_markers if marker in msg_lower) if score >= 3 or len(message) > 500: return "plan_execute" elif "review" in msg_lower or "check" in msg_lower or "verify" in msg_lower: return "reflexion" else: return "react" # ========================================== # REASONING ENGINES # ========================================== class ReActEngine: """ReAct: Reasoning + Acting in interleaved steps""" async def run(self, message: str, session_id: str) -> Dict: steps = [] # Initial thought messages = [ {"role": "system", "content": "You are a ReAct agent. Think step by step and act."}, {"role": "user", "content": message} ] response = await call_llm(messages, model="fast-tier") steps.append(AgentStep( step_number=1, type="thought", content="Initial analysis and reasoning" )) content = response["choices"][0]["message"]["content"] return { "response": content, "steps": steps, "model_used": "fast-tier" } class PlanAndExecuteEngine: """Plan first, then execute step by step""" async def run(self, message: str, session_id: str) -> Dict: steps = [] # Planning phase plan_messages = [ {"role": "system", "content": "Create a step-by-step plan to accomplish the task."}, {"role": "user", "content": f"Create a detailed plan for: {message}"} ] plan_response = await call_llm(plan_messages, model="volume-tier") plan = plan_response["choices"][0]["message"]["content"] steps.append(AgentStep( step_number=1, type="plan", content=plan )) # Execution phase exec_messages = [ {"role": "system", "content": "Execute the task following the provided plan."}, {"role": "user", "content": f"Task: {message}\n\nPlan: {plan}\n\nExecute this plan:"} ] exec_response = await call_llm(exec_messages, model="reasoning-tier") result = exec_response["choices"][0]["message"]["content"] steps.append(AgentStep( step_number=2, type="action", content="Execution completed" )) return { "response": result, "steps": steps, "model_used": "reasoning-tier", "plan": plan } class ReflexionEngine: """Execute with self-reflection and correction""" async def run(self, message: str, session_id: str, max_iterations: int = 2) -> Dict: steps = [] # Initial execution messages = [ {"role": "system", "content": "Solve the problem carefully."}, {"role": "user", "content": message} ] response = await call_llm(messages, model="quality-tier") answer = response["choices"][0]["message"]["content"] steps.append(AgentStep( step_number=1, type="action", content="Initial solution generated" )) # Reflection phase for i in range(max_iterations): reflect_messages = [ {"role": "system", "content": "Critically evaluate the solution for errors or improvements."}, {"role": "user", "content": f"Problem: {message}\n\nProposed Solution: {answer}\n\nIdentify any issues or improvements:"} ] reflect_response = await call_llm(reflect_messages, model="claude-haiku") reflection = reflect_response["choices"][0]["message"]["content"] if "correct" in reflection.lower() and "no issues" in reflection.lower(): break steps.append(AgentStep( step_number=2+i, type="reflection", content=reflection )) # Improve based on reflection improve_messages = [ {"role": "system", "content": "Improve the solution based on the critique."}, {"role": "user", "content": f"Original: {answer}\n\nIssues found: {reflection}\n\nProvide improved solution:"} ] improve_response = await call_llm(improve_messages, model="quality-tier") answer = improve_response["choices"][0]["message"]["content"] return { "response": answer, "steps": steps, "model_used": "quality-tier", "iterations": len(steps) } # ========================================== # API ENDPOINTS # ========================================== @app.post("/v1/chat/completions", response_model=AgentResponse) async def agent_endpoint(request: AgentRequest): """ Main agent endpoint with multiple reasoning strategies: - react: Fast iterative reasoning (good for simple tasks) - plan_execute: Plan then execute (good for complex tasks) - reflexion: Self-correcting (good for accuracy-critical tasks) - auto: Automatically select based on task complexity """ # Determine reasoning mode mode = determine_reasoning_mode(request.message, request.reasoning_mode) # Store message in memory if enabled if request.enable_memory and redis_client: key = f"session:{request.session_id}:history" redis_client.lpush(key, request.message) redis_client.ltrim(key, 0, 99) # Keep last 100 messages try: # Route to appropriate reasoning engine if mode == "react": result = await ReActEngine().run(request.message, request.session_id) elif mode == "plan_execute": result = await PlanAndExecuteEngine().run(request.message, request.session_id) elif mode == "reflexion": result = await ReflexionEngine().run( request.message, request.session_id, max_iterations=2 ) else: # Default fallback result = await ReActEngine().run(request.message, request.session_id) return AgentResponse( response=result["response"], reasoning_mode=mode, session_id=request.session_id, steps=result.get("steps", []), metadata={ "model_used": result.get("model_used", "unknown"), "auto_selected": request.reasoning_mode == "auto", "timestamp": datetime.now().isoformat() } ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/v1/models") async def list_models(): """List available agent models""" return { "object": "list", "data": [ { "id": "agent/orchestrator", "object": "model", "created": 1700000000, "owned_by": "llm-hub", "description": "Auto-selecting orchestrator" }, { "id": "agent/react", "object": "model", "created": 1700000000, "owned_by": "llm-hub", "description": "ReAct reasoning - fast iterative" }, { "id": "agent/plan-execute", "object": "model", "created": 1700000000, "owned_by": "llm-hub", "description": "Plan-and-Execute - complex tasks" }, { "id": "agent/reflexion", "object": "model", "created": 1700000000, "owned_by": "llm-hub", "description": "Reflexion - self-correcting with verification" } ] } @app.get("/health") async def health(): """Health check endpoint""" redis_status = "connected" if redis_client and redis_client.ping() else "disconnected" return { "status": "healthy", "version": "2.0.0", "capabilities": ["react", "plan_execute", "reflexion", "mcp", "memory"], "default_mode": DEFAULT_REASONING, "redis": redis_status, "timestamp": datetime.now().isoformat() } @app.get("/sessions/{session_id}/history") async def get_session_history(session_id: str, limit: int = 10): """Retrieve conversation history for a session""" if not redis_client: return {"error": "Redis not available"} key = f"session:{session_id}:history" history = redis_client.lrange(key, 0, limit - 1) return { "session_id": session_id, "history": history, "count": len(history) } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8080)