BirdScopeAI / langgraph_agent /subagent_supervisor.py
facemelter's picture
Added audio_finder subagent to Specialist supervisor
128f5d1 verified
"""
Subagent Supervisor
Uses LangGraph's create_supervisor() for LLM-based routing between specialists.
"""
from typing import List, Any
from langchain_core.language_models import BaseChatModel
from langchain.agents import create_agent
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import InMemorySaver
from .subagent_config import SubAgentConfig
from .subagent_factory import SubAgentFactory
async def create_supervisor_workflow(all_tools: List[Any], llm: BaseChatModel, provider: str = "openai"):
"""
Create a supervisor workflow that orchestrates specialized subagents.
The supervisor uses LLM-based routing to delegate tasks to the most
appropriate specialist agent.
Args:
all_tools: Full list of available MCP tools
llm: Language model for both supervisor and subagents
provider: LLM provider name ("openai", "anthropic", "huggingface")
Returns:
Compiled LangGraph workflow with supervisor
"""
from langgraph_supervisor import create_supervisor
# Create the three specialist agents with provider-specific prompts
print("[SUPERVISOR]: Creating specialist agents...")
image_agent = await SubAgentFactory.create_subagent(
"image_identifier", all_tools, llm, provider=provider
)
taxonomy_agent = await SubAgentFactory.create_subagent(
"taxonomy_specialist", all_tools, llm, provider=provider
)
audio_finder_agent = await SubAgentFactory.create_subagent(
"generalist", all_tools, llm, provider=provider
)
# Create supervisor with LLM-based routing and provider-specific prompt
print("[SUPERVISOR]: Creating supervisor orchestrator...")
# create_supervisor takes a list of agents as first positional argument
workflow = create_supervisor(
[image_agent, taxonomy_agent, audio_finder_agent],
model=llm,
prompt=SubAgentConfig.get_router_prompt(provider=provider)
)
# Compile with shared memory for conversation context
print("[SUPERVISOR]: Compiling workflow with memory...")
return workflow.compile(checkpointer=InMemorySaver())