""" Subagent Router Orchestrates routing between specialized subagents using LangGraph's delegation pattern. """ from typing import Dict, Any, List, Literal from langchain_core.language_models import BaseChatModel from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from langgraph.graph import StateGraph, MessagesState, START, END from langgraph.prebuilt import ToolNode from langgraph.checkpoint.memory import InMemorySaver from .subagent_config import SubAgentConfig from .subagent_factory import SubAgentFactory async def create_router_agent(all_tools: List[Any], llm: BaseChatModel): """ Create a router agent that orchestrates specialized subagents. Args: all_tools: Full list of available MCP tools llm: Language model for the router Returns: Compiled LangGraph workflow """ async def router_node(state: MessagesState): """Main router that delegates to subagents.""" # Get routing instructions router_prompt = SubAgentConfig.get_router_prompt() # Add system message with routing instructions messages = [SystemMessage(content=router_prompt)] + state["messages"] # Router decides which subagent to use response = await llm.ainvoke(messages) # Extract subagent name from response (you could make this more sophisticated) # For now, the router will use tools to delegate return {"messages": [response]} async def create_subagent_node(subagent_name: str): """Create a node for a specific subagent.""" async def subagent_node(state: MessagesState): # Create the specialized subagent subagent = await SubAgentFactory.create_subagent( subagent_name, all_tools, llm ) # Run the subagent result = await subagent.ainvoke(state) return result return subagent_node # Build the graph workflow = StateGraph(MessagesState) # Add nodes workflow.add_node("router", router_node) workflow.add_node("image_identifier", await create_subagent_node("image_identifier")) workflow.add_node("species_explorer", await create_subagent_node("species_explorer")) workflow.add_node("taxonomy_specialist", await create_subagent_node("taxonomy_specialist")) # Define routing logic def route_to_specialist(state: MessagesState) -> Literal["image_identifier", "species_explorer", "taxonomy_specialist", END]: """Route based on last message content.""" last_message = state["messages"][-1] content = last_message.content.lower() # Simple keyword-based routing (could be improved with LLM classification) if any(word in content for word in ["identify", "what bird", "classify", "image", "photo"]): return "image_identifier" elif any(word in content for word in ["audio", "sound", "call", "song", "find", "search"]): return "species_explorer" elif any(word in content for word in ["family", "families", "conservation", "endangered", "taxonomy"]): return "taxonomy_specialist" else: # Default to species explorer for general queries return "species_explorer" # Connect nodes workflow.add_edge(START, "router") workflow.add_conditional_edges("router", route_to_specialist) workflow.add_edge("image_identifier", END) workflow.add_edge("species_explorer", END) workflow.add_edge("taxonomy_specialist", END) # Compile with memory for conversation context return workflow.compile(checkpointer=InMemorySaver())