Open Deep Research
Open Deep Research
- Generated on: May 27, 2025 at 19:31:46 PDT
- Source Directory:
https://github.com/langchain-ai/open_deep_research/tree/main/src/open_deep_research
- Total Files: 7 Python files
File Index
__init__.py
- Package initialization with imports/exports (72 bytes, 2 lines)configuration.py
- Runtime configuration schema for LangGraph StateGraph with multi-provider LLM support (3183 bytes, 74 lines)graph.py
- LangGraph StateGraph with Send API parallel execution, human-in-the-loop interrupts, and conditional routing (20842 bytes, 492 lines)multi_agent.py
- LangGraph multi-agent supervisor architecture with tool-calling agents and Command-based handoffs (13253 bytes, 330 lines)prompts.py
- Structured output prompts for LLM planning, writing, and multi-agent coordination (14332 bytes, 395 lines)state.py
- Pydantic state schemas and TypedDict definitions for LangGraph workflow persistence (2449 bytes, 64 lines)utils.py
- Multi-provider search API integrations with async execution and content deduplication (63345 bytes, 1470 lines)
init.py
File Metadata:
- Path:
__init__.py
- Size: 72 bytes
- Lines: 2
- Purpose: Package initialization with imports/exports
"""Planning, research, and report generation."""
__version__ = "0.0.15"
configuration.py
File Metadata:
- Path:
configuration.py
- Size: 3183 bytes
- Lines: 74
- Purpose: Runtime configuration schema for LangGraph StateGraph with multi-provider LLM support
import os
from enum import Enum
from dataclasses import dataclass, fields
from typing import Any, Optional, Dict
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.runnables import RunnableConfig
from dataclasses import dataclass
DEFAULT_REPORT_STRUCTURE = """Use this structure to create a report on the user-provided topic:
1. Introduction (no research needed)
- Brief overview of the topic area
2. Main Body Sections:
- Each section should focus on a sub-topic of the user-provided topic
3. Conclusion
- Aim for 1 structural element (either a list of table) that distills the main body sections
- Provide a concise summary of the report"""
class SearchAPI(Enum):
PERPLEXITY = "perplexity"
TAVILY = "tavily"
EXA = "exa"
ARXIV = "arxiv"
PUBMED = "pubmed"
LINKUP = "linkup"
DUCKDUCKGO = "duckduckgo"
GOOGLESEARCH = "googlesearch"
@dataclass(kw_only=True)
class Configuration:
"""The configurable fields for the chatbot."""
# Common configuration
report_structure: str = DEFAULT_REPORT_STRUCTURE # Defaults to the default report structure
search_api: SearchAPI = SearchAPI.TAVILY # Default to TAVILY
search_api_config: Optional[Dict[str, Any]] = None
# Graph-specific configuration
number_of_queries: int = 2 # Number of search queries to generate per iteration
max_search_depth: int = 2 # Maximum number of reflection + search iterations
planner_provider: str = "openai" # Defaults to OpenAI as provider
planner_model: str = "o3-mini" # Defaults to o3-mini, add "-thinking" to enable thinking mode
writer_provider: str = "openai" # Defaults to OpenAI as provider
writer_model: str = "o3-mini" # Defaults to o3-mini
## planner_provider: str = "anthropic" # Defaults to Anthropic as provider
## planner_model: str = "claude-3-7-sonnet-latest" # Defaults to claude-3-7-sonnet-latest
planner_model_kwargs: Optional[Dict[str, Any]] = None # kwargs for planner_model
##writer_provider: str = "anthropic" # Defaults to Anthropic as provider
##writer_model: str = "claude-3-5-sonnet-latest" # Defaults to claude-3-5-sonnet-latest
writer_model_kwargs: Optional[Dict[str, Any]] = None # kwargs for writer_model
search_api: SearchAPI = SearchAPI.TAVILY # Default to TAVILY
search_api_config: Optional[Dict[str, Any]] = None
# Multi-agent specific configuration
supervisor_model: str = "openai:gpt-4.1" # Model for supervisor agent in multi-agent setup
researcher_model: str = "openai:gpt-4.1" # Model for research agents in multi-agent setup
@classmethod
def from_runnable_config(
cls, config: Optional[RunnableConfig] = None
) -> "Configuration":
"""Create a Configuration instance from a RunnableConfig."""
configurable = (
config["configurable"] if config and "configurable" in config else {}
)
values: dict[str, Any] = {
f.name: os.environ.get(f.name.upper(), configurable.get(f.name))
for f in fields(cls)
if f.init
}
return cls(**{k: v for k, v in values.items() if v})
graph.py
File Metadata:
- Path:
graph.py
- Size: 20842 bytes
- Lines: 492
- Purpose: LangGraph StateGraph with Send API parallel execution, human-in-the-loop interrupts, and conditional routing
from typing import Literal
from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from langgraph.constants import Send
from langgraph.graph import START, END, StateGraph
from langgraph.types import interrupt, Command
from open_deep_research.state import (
ReportStateInput,
ReportStateOutput,
Sections,
ReportState,
SectionState,
SectionOutputState,
Queries,
Feedback
)
from open_deep_research.prompts import (
report_planner_query_writer_instructions,
report_planner_instructions,
query_writer_instructions,
section_writer_instructions,
final_section_writer_instructions,
section_grader_instructions,
section_writer_inputs
)
from open_deep_research.configuration import Configuration
from open_deep_research.utils import (
format_sections,
get_config_value,
get_search_params,
select_and_execute_search
)
## Nodes --
async def generate_report_plan(state: ReportState, config: RunnableConfig):
"""Generate the initial report plan with sections.
This node:
1. Gets configuration for the report structure and search parameters
2. Generates search queries to gather context for planning
3. Performs web searches using those queries
4. Uses an LLM to generate a structured plan with sections
Args:
state: Current graph state containing the report topic
config: Configuration for models, search APIs, etc.
Returns:
Dict containing the generated sections
"""
# Inputs
topic = state["topic"]
# Get list of feedback on the report plan
feedback_list = state.get("feedback_on_report_plan", [])
# Concatenate feedback on the report plan into a single string
feedback = " /// ".join(feedback_list) if feedback_list else ""
# Get configuration
configurable = Configuration.from_runnable_config(config)
report_structure = configurable.report_structure
number_of_queries = configurable.number_of_queries
search_api = get_config_value(configurable.search_api)
search_api_config = configurable.search_api_config or {} # Get the config dict, default to empty
params_to_pass = get_search_params(search_api, search_api_config) # Filter parameters
# Convert JSON object to string if necessary
if isinstance(report_structure, dict):
report_structure = str(report_structure)
# Set writer model (model used for query writing)
writer_provider = get_config_value(configurable.writer_provider)
writer_model_name = get_config_value(configurable.writer_model)
writer_model_kwargs = get_config_value(configurable.writer_model_kwargs or {})
writer_model = init_chat_model(model=writer_model_name, model_provider=writer_provider, model_kwargs=writer_model_kwargs)
structured_llm = writer_model.with_structured_output(Queries)
# Format system instructions
system_instructions_query = report_planner_query_writer_instructions.format(topic=topic, report_organization=report_structure, number_of_queries=number_of_queries)
# Generate queries
results = await structured_llm.ainvoke([SystemMessage(content=system_instructions_query),
HumanMessage(content="Generate search queries that will help with planning the sections of the report.")])
# Web search
query_list = [query.search_query for query in results.queries]
# Search the web with parameters
source_str = await select_and_execute_search(search_api, query_list, params_to_pass)
# Format system instructions
system_instructions_sections = report_planner_instructions.format(topic=topic, report_organization=report_structure, context=source_str, feedback=feedback)
# Set the planner
planner_provider = get_config_value(configurable.planner_provider)
planner_model = get_config_value(configurable.planner_model)
planner_model_kwargs = get_config_value(configurable.planner_model_kwargs or {})
# Report planner instructions
planner_message = """Generate the sections of the report. Your response must include a 'sections' field containing a list of sections.
Each section must have: name, description, research, and content fields."""
# Run the planner
if planner_model == "claude-3-7-sonnet-latest":
# Allocate a thinking budget for claude-3-7-sonnet-latest as the planner model
planner_llm = init_chat_model(model=planner_model,
model_provider=planner_provider,
max_tokens=20_000
# thinking={"type": "enabled", "budget_tokens": 16_000} # Disabled due to Anthropic structured output incompatibility
)
else:
# With other models, thinking tokens are not specifically allocated
planner_llm = init_chat_model(model=planner_model,
model_provider=planner_provider,
model_kwargs=planner_model_kwargs)
# Generate the report sections
structured_llm = planner_llm.with_structured_output(Sections)
report_sections = await structured_llm.ainvoke([SystemMessage(content=system_instructions_sections),
HumanMessage(content=planner_message)])
# Get sections
sections = report_sections.sections
return {"sections": sections}
def human_feedback(state: ReportState, config: RunnableConfig) -> Command[Literal["generate_report_plan","build_section_with_web_research"]]:
"""Get human feedback on the report plan and route to next steps.
This node:
1. Formats the current report plan for human review
2. Gets feedback via an interrupt
3. Routes to either:
- Section writing if plan is approved
- Plan regeneration if feedback is provided
Args:
state: Current graph state with sections to review
config: Configuration for the workflow
Returns:
Command to either regenerate plan or start section writing
"""
# Get sections
topic = state["topic"]
sections = state['sections']
sections_str = "\n\n".join(
f"Section: {section.name}\n"
f"Description: {section.description}\n"
f"Research needed: {'Yes' if section.research else 'No'}\n"
for section in sections
)
# Get feedback on the report plan from interrupt
interrupt_message = f"""Please provide feedback on the following report plan.
\n\n{sections_str}\n
\nDoes the report plan meet your needs?\nPass 'true' to approve the report plan.\nOr, provide feedback to regenerate the report plan:"""
feedback = interrupt(interrupt_message)
# If the user approves the report plan, kick off section writing
if isinstance(feedback, bool) and feedback is True:
# Treat this as approve and kick off section writing
return Command(goto=[
Send("build_section_with_web_research", {"topic": topic, "section": s, "search_iterations": 0})
for s in sections
if s.research
])
# If the user provides feedback, regenerate the report plan
elif isinstance(feedback, str):
# Treat this as feedback and append it to the existing list
return Command(goto="generate_report_plan",
update={"feedback_on_report_plan": [feedback]})
else:
raise TypeError(f"Interrupt value of type {type(feedback)} is not supported.")
async def generate_queries(state: SectionState, config: RunnableConfig):
"""Generate search queries for researching a specific section.
This node uses an LLM to generate targeted search queries based on the
section topic and description.
Args:
state: Current state containing section details
config: Configuration including number of queries to generate
Returns:
Dict containing the generated search queries
"""
# Get state
topic = state["topic"]
section = state["section"]
# Get configuration
configurable = Configuration.from_runnable_config(config)
number_of_queries = configurable.number_of_queries
# Generate queries
writer_provider = get_config_value(configurable.writer_provider)
writer_model_name = get_config_value(configurable.writer_model)
writer_model_kwargs = get_config_value(configurable.writer_model_kwargs or {})
writer_model = init_chat_model(model=writer_model_name, model_provider=writer_provider, model_kwargs=writer_model_kwargs)
structured_llm = writer_model.with_structured_output(Queries)
# Format system instructions
system_instructions = query_writer_instructions.format(topic=topic,
section_topic=section.description,
number_of_queries=number_of_queries)
# Generate queries
queries = await structured_llm.ainvoke([SystemMessage(content=system_instructions),
HumanMessage(content="Generate search queries on the provided topic.")])
return {"search_queries": queries.queries}
async def search_web(state: SectionState, config: RunnableConfig):
"""Execute web searches for the section queries.
This node:
1. Takes the generated queries
2. Executes searches using configured search API
3. Formats results into usable context
Args:
state: Current state with search queries
config: Search API configuration
Returns:
Dict with search results and updated iteration count
"""
# Get state
search_queries = state["search_queries"]
# Get configuration
configurable = Configuration.from_runnable_config(config)
search_api = get_config_value(configurable.search_api)
search_api_config = configurable.search_api_config or {} # Get the config dict, default to empty
params_to_pass = get_search_params(search_api, search_api_config) # Filter parameters
# Web search
query_list = [query.search_query for query in search_queries]
# Search the web with parameters
source_str = await select_and_execute_search(search_api, query_list, params_to_pass)
return {"source_str": source_str, "search_iterations": state["search_iterations"] + 1}
async def write_section(state: SectionState, config: RunnableConfig) -> Command[Literal[END, "search_web"]]:
"""Write a section of the report and evaluate if more research is needed.
This node:
1. Writes section content using search results
2. Evaluates the quality of the section
3. Either:
- Completes the section if quality passes
- Triggers more research if quality fails
Args:
state: Current state with search results and section info
config: Configuration for writing and evaluation
Returns:
Command to either complete section or do more research
"""
# Get state
topic = state["topic"]
section = state["section"]
source_str = state["source_str"]
# Get configuration
configurable = Configuration.from_runnable_config(config)
# Format system instructions
section_writer_inputs_formatted = section_writer_inputs.format(topic=topic,
section_name=section.name,
section_topic=section.description,
context=source_str,
section_content=section.content)
# Generate section
writer_provider = get_config_value(configurable.writer_provider)
writer_model_name = get_config_value(configurable.writer_model)
writer_model_kwargs = get_config_value(configurable.writer_model_kwargs or {})
writer_model = init_chat_model(model=writer_model_name, model_provider=writer_provider, model_kwargs=writer_model_kwargs)
section_content = await writer_model.ainvoke([SystemMessage(content=section_writer_instructions),
HumanMessage(content=section_writer_inputs_formatted)])
# Write content to the section object
section.content = section_content.content
# Grade prompt
section_grader_message = ("Grade the report and consider follow-up questions for missing information. "
"If the grade is 'pass', return empty strings for all follow-up queries. "
"If the grade is 'fail', provide specific search queries to gather missing information.")
section_grader_instructions_formatted = section_grader_instructions.format(topic=topic,
section_topic=section.description,
section=section.content,
number_of_follow_up_queries=configurable.number_of_queries)
# Use planner model for reflection
planner_provider = get_config_value(configurable.planner_provider)
planner_model = get_config_value(configurable.planner_model)
planner_model_kwargs = get_config_value(configurable.planner_model_kwargs or {})
if planner_model == "claude-3-7-sonnet-latest":
# Allocate a thinking budget for claude-3-7-sonnet-latest as the planner model
reflection_model = init_chat_model(model=planner_model,
model_provider=planner_provider,
max_tokens=20_000
# thinking={"type": "enabled", "budget_tokens": 16_000} # Disabled due to Anthropic structured output incompatibility
).with_structured_output(Feedback)
else:
reflection_model = init_chat_model(model=planner_model,
model_provider=planner_provider, model_kwargs=planner_model_kwargs).with_structured_output(Feedback)
# Generate feedback
feedback = await reflection_model.ainvoke([SystemMessage(content=section_grader_instructions_formatted),
HumanMessage(content=section_grader_message)])
# If the section is passing or the max search depth is reached, publish the section to completed sections
if feedback.grade == "pass" or state["search_iterations"] >= configurable.max_search_depth:
# Publish the section to completed sections
return Command(
update={"completed_sections": [section]},
goto=END
)
# Update the existing section with new content and update search queries
else:
return Command(
update={"search_queries": feedback.follow_up_queries, "section": section},
goto="search_web"
)
async def write_final_sections(state: SectionState, config: RunnableConfig):
"""Write sections that don't require research using completed sections as context.
This node handles sections like conclusions or summaries that build on
the researched sections rather than requiring direct research.
Args:
state: Current state with completed sections as context
config: Configuration for the writing model
Returns:
Dict containing the newly written section
"""
# Get configuration
configurable = Configuration.from_runnable_config(config)
# Get state
topic = state["topic"]
section = state["section"]
completed_report_sections = state["report_sections_from_research"]
# Format system instructions
system_instructions = final_section_writer_instructions.format(topic=topic, section_name=section.name, section_topic=section.description, context=completed_report_sections)
# Generate section
writer_provider = get_config_value(configurable.writer_provider)
writer_model_name = get_config_value(configurable.writer_model)
writer_model_kwargs = get_config_value(configurable.writer_model_kwargs or {})
writer_model = init_chat_model(model=writer_model_name, model_provider=writer_provider, model_kwargs=writer_model_kwargs)
section_content = await writer_model.ainvoke([SystemMessage(content=system_instructions),
HumanMessage(content="Generate a report section based on the provided sources.")])
# Write content to section
section.content = section_content.content
# Write the updated section to completed sections
return {"completed_sections": [section]}
def gather_completed_sections(state: ReportState):
"""Format completed sections as context for writing final sections.
This node takes all completed research sections and formats them into
a single context string for writing summary sections.
Args:
state: Current state with completed sections
Returns:
Dict with formatted sections as context
"""
# List of completed sections
completed_sections = state["completed_sections"]
# Format completed section to str to use as context for final sections
completed_report_sections = format_sections(completed_sections)
return {"report_sections_from_research": completed_report_sections}
def compile_final_report(state: ReportState):
"""Compile all sections into the final report.
This node:
1. Gets all completed sections
2. Orders them according to original plan
3. Combines them into the final report
Args:
state: Current state with all completed sections
Returns:
Dict containing the complete report
"""
# Get sections
sections = state["sections"]
completed_sections = {s.name: s.content for s in state["completed_sections"]}
# Update sections with completed content while maintaining original order
for section in sections:
section.content = completed_sections[section.name]
# Compile final report
all_sections = "\n\n".join([s.content for s in sections])
return {"final_report": all_sections}
def initiate_final_section_writing(state: ReportState):
"""Create parallel tasks for writing non-research sections.
This edge function identifies sections that don't need research and
creates parallel writing tasks for each one.
Args:
state: Current state with all sections and research context
Returns:
List of Send commands for parallel section writing
"""
# Kick off section writing in parallel via Send() API for any sections that do not require research
return [
Send("write_final_sections", {"topic": state["topic"], "section": s, "report_sections_from_research": state["report_sections_from_research"]})
for s in state["sections"]
if not s.research
]
# Report section sub-graph --
# Add nodes
section_builder = StateGraph(SectionState, output=SectionOutputState)
section_builder.add_node("generate_queries", generate_queries)
section_builder.add_node("search_web", search_web)
section_builder.add_node("write_section", write_section)
# Add edges
section_builder.add_edge(START, "generate_queries")
section_builder.add_edge("generate_queries", "search_web")
section_builder.add_edge("search_web", "write_section")
# Outer graph for initial report plan compiling results from each section --
# Add nodes
builder = StateGraph(ReportState, input=ReportStateInput, output=ReportStateOutput, config_schema=Configuration)
builder.add_node("generate_report_plan", generate_report_plan)
builder.add_node("human_feedback", human_feedback)
builder.add_node("build_section_with_web_research", section_builder.compile())
builder.add_node("gather_completed_sections", gather_completed_sections)
builder.add_node("write_final_sections", write_final_sections)
builder.add_node("compile_final_report", compile_final_report)
# Add edges
builder.add_edge(START, "generate_report_plan")
builder.add_edge("generate_report_plan", "human_feedback")
builder.add_edge("build_section_with_web_research", "gather_completed_sections")
builder.add_conditional_edges("gather_completed_sections", initiate_final_section_writing, ["write_final_sections"])
builder.add_edge("write_final_sections", "compile_final_report")
builder.add_edge("compile_final_report", END)
graph = builder.compile()
multi_agent.py
File Metadata:
- Path:
multi_agent.py
- Size: 13253 bytes
- Lines: 330
- Purpose: LangGraph multi-agent supervisor architecture with tool-calling agents and Command-based handoffs
from typing import List, Annotated, TypedDict, operator, Literal
from pydantic import BaseModel, Field
from langchain.chat_models import init_chat_model
from langchain_core.tools import tool
from langchain_core.runnables import RunnableConfig
from langgraph.graph import MessagesState
from langgraph.types import Command, Send
from langgraph.graph import START, END, StateGraph
from open_deep_research.configuration import Configuration
from open_deep_research.utils import get_config_value, tavily_search, duckduckgo_search
from open_deep_research.prompts import SUPERVISOR_INSTRUCTIONS, RESEARCH_INSTRUCTIONS
## Tools factory - will be initialized based on configuration
def get_search_tool(config: RunnableConfig):
"""Get the appropriate search tool based on configuration"""
configurable = Configuration.from_runnable_config(config)
search_api = get_config_value(configurable.search_api)
# TODO: Configure other search functions as tools
if search_api.lower() == "tavily":
# Use Tavily search tool
return tavily_search
elif search_api.lower() == "duckduckgo":
# Use the DuckDuckGo search tool
return duckduckgo_search
else:
# Raise NotImplementedError for search APIs other than Tavily
raise NotImplementedError(
f"The search API '{search_api}' is not yet supported in the multi-agent implementation. "
f"Currently, only Tavily is supported. Please use the graph-based implementation in "
f"src/open_deep_research/graph.py for other search APIs, or set search_api to 'tavily'."
)
@tool
class Section(BaseModel):
name: str = Field(
description="Name for this section of the report.",
)
description: str = Field(
description="Research scope for this section of the report.",
)
content: str = Field(
description="The content of the section."
)
@tool
class Sections(BaseModel):
sections: List[str] = Field(
description="Sections of the report.",
)
@tool
class Introduction(BaseModel):
name: str = Field(
description="Name for the report.",
)
content: str = Field(
description="The content of the introduction, giving an overview of the report."
)
@tool
class Conclusion(BaseModel):
name: str = Field(
description="Name for the conclusion of the report.",
)
content: str = Field(
description="The content of the conclusion, summarizing the report."
)
## State
class ReportStateOutput(TypedDict):
final_report: str # Final report
class ReportState(MessagesState):
sections: list[str] # List of report sections
completed_sections: Annotated[list, operator.add] # Send() API key
final_report: str # Final report
class SectionState(MessagesState):
section: str # Report section
completed_sections: list[Section] # Final key we duplicate in outer state for Send() API
class SectionOutputState(TypedDict):
completed_sections: list[Section] # Final key we duplicate in outer state for Send() API
# Tool lists will be built dynamically based on configuration
def get_supervisor_tools(config: RunnableConfig):
"""Get supervisor tools based on configuration"""
search_tool = get_search_tool(config)
tool_list = [search_tool, Sections, Introduction, Conclusion]
return tool_list, {tool.name: tool for tool in tool_list}
def get_research_tools(config: RunnableConfig):
"""Get research tools based on configuration"""
search_tool = get_search_tool(config)
tool_list = [search_tool, Section]
return tool_list, {tool.name: tool for tool in tool_list}
async def supervisor(state: ReportState, config: RunnableConfig):
"""LLM decides whether to call a tool or not"""
# Messages
messages = state["messages"]
# Get configuration
configurable = Configuration.from_runnable_config(config)
supervisor_model = get_config_value(configurable.supervisor_model)
# Initialize the model
llm = init_chat_model(model=supervisor_model)
# If sections have been completed, but we don't yet have the final report, then we need to initiate writing the introduction and conclusion
if state.get("completed_sections") and not state.get("final_report"):
research_complete_message = {"role": "user", "content": "Research is complete. Now write the introduction and conclusion for the report. Here are the completed main body sections: \n\n" + "\n\n".join([s.content for s in state["completed_sections"]])}
messages = messages + [research_complete_message]
# Get tools based on configuration
supervisor_tool_list, _ = get_supervisor_tools(config)
# Invoke
return {
"messages": [
await llm.bind_tools(supervisor_tool_list, parallel_tool_calls=False).ainvoke(
[
{"role": "system",
"content": SUPERVISOR_INSTRUCTIONS,
}
]
+ messages
)
]
}
async def supervisor_tools(state: ReportState, config: RunnableConfig) -> Command[Literal["supervisor", "research_team", "__end__"]]:
"""Performs the tool call and sends to the research agent"""
result = []
sections_list = []
intro_content = None
conclusion_content = None
# Get tools based on configuration
_, supervisor_tools_by_name = get_supervisor_tools(config)
# First process all tool calls to ensure we respond to each one (required for OpenAI)
for tool_call in state["messages"][-1].tool_calls:
# Get the tool
tool = supervisor_tools_by_name[tool_call["name"]]
# Perform the tool call - use ainvoke for async tools
if hasattr(tool, 'ainvoke'):
observation = await tool.ainvoke(tool_call["args"])
else:
observation = tool.invoke(tool_call["args"])
# Append to messages
result.append({"role": "tool",
"content": observation,
"name": tool_call["name"],
"tool_call_id": tool_call["id"]})
# Store special tool results for processing after all tools have been called
if tool_call["name"] == "Sections":
sections_list = observation.sections
elif tool_call["name"] == "Introduction":
# Format introduction with proper H1 heading if not already formatted
if not observation.content.startswith("# "):
intro_content = f"# {observation.name}\n\n{observation.content}"
else:
intro_content = observation.content
elif tool_call["name"] == "Conclusion":
# Format conclusion with proper H2 heading if not already formatted
if not observation.content.startswith("## "):
conclusion_content = f"## {observation.name}\n\n{observation.content}"
else:
conclusion_content = observation.content
# After processing all tool calls, decide what to do next
if sections_list:
# Send the sections to the research agents
return Command(goto=[Send("research_team", {"section": s}) for s in sections_list], update={"messages": result})
elif intro_content:
# Store introduction while waiting for conclusion
# Append to messages to guide the LLM to write conclusion next
result.append({"role": "user", "content": "Introduction written. Now write a conclusion section."})
return Command(goto="supervisor", update={"final_report": intro_content, "messages": result})
elif conclusion_content:
# Get all sections and combine in proper order: Introduction, Body Sections, Conclusion
intro = state.get("final_report", "")
body_sections = "\n\n".join([s.content for s in state["completed_sections"]])
# Assemble final report in correct order
complete_report = f"{intro}\n\n{body_sections}\n\n{conclusion_content}"
# Append to messages to indicate completion
result.append({"role": "user", "content": "Report is now complete with introduction, body sections, and conclusion."})
return Command(goto="supervisor", update={"final_report": complete_report, "messages": result})
else:
# Default case (for search tools, etc.)
return Command(goto="supervisor", update={"messages": result})
async def supervisor_should_continue(state: ReportState) -> Literal["supervisor_tools", END]:
"""Decide if we should continue the loop or stop based upon whether the LLM made a tool call"""
messages = state["messages"]
last_message = messages[-1]
# If the LLM makes a tool call, then perform an action
if last_message.tool_calls:
return "supervisor_tools"
# Else end because the supervisor asked a question or is finished
else:
return END
async def research_agent(state: SectionState, config: RunnableConfig):
"""LLM decides whether to call a tool or not"""
# Get configuration
configurable = Configuration.from_runnable_config(config)
researcher_model = get_config_value(configurable.researcher_model)
# Initialize the model
llm = init_chat_model(model=researcher_model)
# Get tools based on configuration
research_tool_list, _ = get_research_tools(config)
return {
"messages": [
# Enforce tool calling to either perform more search or call the Section tool to write the section
await llm.bind_tools(research_tool_list).ainvoke(
[
{"role": "system",
"content": RESEARCH_INSTRUCTIONS.format(section_description=state["section"])
}
]
+ state["messages"]
)
]
}
async def research_agent_tools(state: SectionState, config: RunnableConfig):
"""Performs the tool call and route to supervisor or continue the research loop"""
result = []
completed_section = None
# Get tools based on configuration
_, research_tools_by_name = get_research_tools(config)
# Process all tool calls first (required for OpenAI)
for tool_call in state["messages"][-1].tool_calls:
# Get the tool
tool = research_tools_by_name[tool_call["name"]]
# Perform the tool call - use ainvoke for async tools
if hasattr(tool, 'ainvoke'):
observation = await tool.ainvoke(tool_call["args"])
else:
observation = tool.invoke(tool_call["args"])
# Append to messages
result.append({"role": "tool",
"content": observation,
"name": tool_call["name"],
"tool_call_id": tool_call["id"]})
# Store the section observation if a Section tool was called
if tool_call["name"] == "Section":
completed_section = observation
# After processing all tools, decide what to do next
if completed_section:
# Write the completed section to state and return to the supervisor
return {"messages": result, "completed_sections": [completed_section]}
else:
# Continue the research loop for search tools, etc.
return {"messages": result}
async def research_agent_should_continue(state: SectionState) -> Literal["research_agent_tools", END]:
"""Decide if we should continue the loop or stop based upon whether the LLM made a tool call"""
messages = state["messages"]
last_message = messages[-1]
# If the LLM makes a tool call, then perform an action
if last_message.tool_calls:
return "research_agent_tools"
else:
return END
"""Build the multi-agent workflow"""
# Research agent workflow
research_builder = StateGraph(SectionState, output=SectionOutputState, config_schema=Configuration)
research_builder.add_node("research_agent", research_agent)
research_builder.add_node("research_agent_tools", research_agent_tools)
research_builder.add_edge(START, "research_agent")
research_builder.add_conditional_edges(
"research_agent",
research_agent_should_continue,
{
# Name returned by should_continue : Name of next node to visit
"research_agent_tools": "research_agent_tools",
END: END,
},
)
research_builder.add_edge("research_agent_tools", "research_agent")
# Supervisor workflow
supervisor_builder = StateGraph(ReportState, input=MessagesState, output=ReportStateOutput, config_schema=Configuration)
supervisor_builder.add_node("supervisor", supervisor)
supervisor_builder.add_node("supervisor_tools", supervisor_tools)
supervisor_builder.add_node("research_team", research_builder.compile())
# Flow of the supervisor agent
supervisor_builder.add_edge(START, "supervisor")
supervisor_builder.add_conditional_edges(
"supervisor",
supervisor_should_continue,
{
# Name returned by should_continue : Name of next node to visit
"supervisor_tools": "supervisor_tools",
END: END,
},
)
supervisor_builder.add_edge("research_team", "supervisor")
graph = supervisor_builder.compile()
prompts.py
File Metadata:
- Path:
prompts.py
- Size: 14332 bytes
- Lines: 395
- Purpose: Structured output prompts for LLM planning, writing, and multi-agent coordination
report_planner_query_writer_instructions="""You are performing research for a report.
<Report topic>
{topic}
</Report topic>
<Report organization>
{report_organization}
</Report organization>
<Task>
Your goal is to generate {number_of_queries} web search queries that will help gather information for planning the report sections.
The queries should:
1. Be related to the Report topic
2. Help satisfy the requirements specified in the report organization
Make the queries specific enough to find high-quality, relevant sources while covering the breadth needed for the report structure.
</Task>
<Format>
Call the Queries tool
</Format>
"""
report_planner_instructions="""I want a plan for a report that is concise and focused.
<Report topic>
The topic of the report is:
{topic}
</Report topic>
<Report organization>
The report should follow this organization:
{report_organization}
</Report organization>
<Context>
Here is context to use to plan the sections of the report:
{context}
</Context>
<Task>
Generate a list of sections for the report. Your plan should be tight and focused with NO overlapping sections or unnecessary filler.
For example, a good report structure might look like:
1/ intro
2/ overview of topic A
3/ overview of topic B
4/ comparison between A and B
5/ conclusion
Each section should have the fields:
- Name - Name for this section of the report.
- Description - Brief overview of the main topics covered in this section.
- Research - Whether to perform web research for this section of the report. IMPORTANT: Main body sections (not intro/conclusion) MUST have Research=True. A report must have AT LEAST 2-3 sections with Research=True to be useful.
- Content - The content of the section, which you will leave blank for now.
Integration guidelines:
- Include examples and implementation details within main topic sections, not as separate sections
- Ensure each section has a distinct purpose with no content overlap
- Combine related concepts rather than separating them
- CRITICAL: Every section MUST be directly relevant to the main topic
- Avoid tangential or loosely related sections that don't directly address the core topic
Before submitting, review your structure to ensure it has no redundant sections and follows a logical flow.
</Task>
<Feedback>
Here is feedback on the report structure from review (if any):
{feedback}
</Feedback>
<Format>
Call the Sections tool
</Format>
"""
query_writer_instructions="""You are an expert technical writer crafting targeted web search queries that will gather comprehensive information for writing a technical report section.
<Report topic>
{topic}
</Report topic>
<Section topic>
{section_topic}
</Section topic>
<Task>
Your goal is to generate {number_of_queries} search queries that will help gather comprehensive information above the section topic.
The queries should:
1. Be related to the topic
2. Examine different aspects of the topic
Make the queries specific enough to find high-quality, relevant sources.
</Task>
<Format>
Call the Queries tool
</Format>
"""
section_writer_instructions = """Write one section of a research report.
<Task>
1. Review the report topic, section name, and section topic carefully.
2. If present, review any existing section content.
3. Then, look at the provided Source material.
4. Decide the sources that you will use it to write a report section.
5. Write the report section and list your sources.
</Task>
<Writing Guidelines>
- If existing section content is not populated, write from scratch
- If existing section content is populated, synthesize it with the source material
- Strict 150-200 word limit
- Use simple, clear language
- Use short paragraphs (2-3 sentences max)
- Use ## for section title (Markdown format)
</Writing Guidelines>
<Citation Rules>
- Assign each unique URL a single citation number in your text
- End with ### Sources that lists each source with corresponding numbers
- IMPORTANT: Number sources sequentially without gaps (1,2,3,4...) in the final list regardless of which sources you choose
- Example format:
[1] Source Title: URL
[2] Source Title: URL
</Citation Rules>
<Final Check>
1. Verify that EVERY claim is grounded in the provided Source material
2. Confirm each URL appears ONLY ONCE in the Source list
3. Verify that sources are numbered sequentially (1,2,3...) without any gaps
</Final Check>
"""
section_writer_inputs="""
<Report topic>
{topic}
</Report topic>
<Section name>
{section_name}
</Section name>
<Section topic>
{section_topic}
</Section topic>
<Existing section content (if populated)>
{section_content}
</Existing section content>
<Source material>
{context}
</Source material>
"""
section_grader_instructions = """Review a report section relative to the specified topic:
<Report topic>
{topic}
</Report topic>
<section topic>
{section_topic}
</section topic>
<section content>
{section}
</section content>
<task>
Evaluate whether the section content adequately addresses the section topic.
If the section content does not adequately address the section topic, generate {number_of_follow_up_queries} follow-up search queries to gather missing information.
</task>
<format>
Call the Feedback tool and output with the following schema:
grade: Literal["pass","fail"] = Field(
description="Evaluation result indicating whether the response meets requirements ('pass') or needs revision ('fail')."
)
follow_up_queries: List[SearchQuery] = Field(
description="List of follow-up search queries.",
)
</format>
"""
final_section_writer_instructions="""You are an expert technical writer crafting a section that synthesizes information from the rest of the report.
<Report topic>
{topic}
</Report topic>
<Section name>
{section_name}
</Section name>
<Section topic>
{section_topic}
</Section topic>
<Available report content>
{context}
</Available report content>
<Task>
1. Section-Specific Approach:
For Introduction:
- Use # for report title (Markdown format)
- 50-100 word limit
- Write in simple and clear language
- Focus on the core motivation for the report in 1-2 paragraphs
- Use a clear narrative arc to introduce the report
- Include NO structural elements (no lists or tables)
- No sources section needed
For Conclusion/Summary:
- Use ## for section title (Markdown format)
- 100-150 word limit
- For comparative reports:
* Must include a focused comparison table using Markdown table syntax
* Table should distill insights from the report
* Keep table entries clear and concise
- For non-comparative reports:
* Only use ONE structural element IF it helps distill the points made in the report:
* Either a focused table comparing items present in the report (using Markdown table syntax)
* Or a short list using proper Markdown list syntax:
- Use `*` or `-` for unordered lists
- Use `1.` for ordered lists
- Ensure proper indentation and spacing
- End with specific next steps or implications
- No sources section needed
3. Writing Approach:
- Use concrete details over general statements
- Make every word count
- Focus on your single most important point
</Task>
<Quality Checks>
- For introduction: 50-100 word limit, # for report title, no structural elements, no sources section
- For conclusion: 100-150 word limit, ## for section title, only ONE structural element at most, no sources section
- Markdown format
- Do not include word count or any preamble in your response
</Quality Checks>"""
## Supervisor
SUPERVISOR_INSTRUCTIONS = """
You are scoping research for a report based on a user-provided topic.
### Your responsibilities:
1. **Gather Background Information**
Based upon the user's topic, use the `enhanced_tavily_search` to collect relevant information about the topic.
- You MUST perform ONLY ONE search to gather comprehensive context
- Create a highly targeted search query that will yield the most valuable information
- Take time to analyze and synthesize the search results before proceeding
- Do not proceed to the next step until you have an understanding of the topic
2. **Clarify the Topic**
After your initial research, engage with the user to clarify any questions that arose.
- Ask ONE SET of follow-up questions based on what you learned from your searches
- Do not proceed until you fully understand the topic, goals, constraints, and any preferences
- Synthesize what you've learned so far before asking questions
- You MUST engage in at least one clarification exchange with the user before proceeding
3. **Define Report Structure**
Only after completing both research AND clarification with the user:
- Use the `Sections` tool to define a list of report sections
- Each section should be a written description with: a section name and a section research plan
- Do not include sections for introductions or conclusions (We'll add these later)
- Ensure sections are scoped to be independently researchable
- Base your sections on both the search results AND user clarifications
- Format your sections as a list of strings, with each string having the scope of research for that section.
4. **Assemble the Final Report**
When all sections are returned:
- IMPORTANT: First check your previous messages to see what you've already completed
- If you haven't created an introduction yet, use the `Introduction` tool to generate one
- Set content to include report title with a single # (H1 level) at the beginning
- Example: "# [Report Title]\n\n[Introduction content...]"
- After the introduction, use the `Conclusion` tool to summarize key insights
- Set content to include conclusion title with ## (H2 level) at the beginning
- Example: "## Conclusion\n\n[Conclusion content...]"
- Only use ONE structural element IF it helps distill the points made in the report:
- Either a focused table comparing items present in the report (using Markdown table syntax)
- Or a short list using proper Markdown list syntax:
- Use `*` or `-` for unordered lists
- Use `1.` for ordered lists
- Ensure proper indentation and spacing
- Do not call the same tool twice - check your message history
### Additional Notes:
- You are a reasoning model. Think through problems step-by-step before acting.
- IMPORTANT: Do not rush to create the report structure. Gather information thoroughly first.
- Use multiple searches to build a complete picture before drawing conclusions.
- Maintain a clear, informative, and professional tone throughout."""
RESEARCH_INSTRUCTIONS = """
You are a researcher responsible for completing a specific section of a report.
### Your goals:
1. **Understand the Section Scope**
Begin by reviewing the section scope of work. This defines your research focus. Use it as your objective.
<Section Description>
{section_description}
</Section Description>
2. **Strategic Research Process**
Follow this precise research strategy:
a) **First Query**: Begin with a SINGLE, well-crafted search query with `enhanced_tavily_search` that directly addresses the core of the section topic.
- Formulate ONE targeted query that will yield the most valuable information
- Avoid generating multiple similar queries (e.g., 'Benefits of X', 'Advantages of X', 'Why use X')
- Example: "Model Context Protocol developer benefits and use cases" is better than separate queries for benefits and use cases
b) **Analyze Results Thoroughly**: After receiving search results:
- Carefully read and analyze ALL provided content
- Identify specific aspects that are well-covered and those that need more information
- Assess how well the current information addresses the section scope
c) **Follow-up Research**: If needed, conduct targeted follow-up searches:
- Create ONE follow-up query that addresses SPECIFIC missing information
- Example: If general benefits are covered but technical details are missing, search for "Model Context Protocol technical implementation details"
- AVOID redundant queries that would return similar information
d) **Research Completion**: Continue this focused process until you have:
- Comprehensive information addressing ALL aspects of the section scope
- At least 3 high-quality sources with diverse perspectives
- Both breadth (covering all aspects) and depth (specific details) of information
3. **Use the Section Tool**
Only after thorough research, write a high-quality section using the Section tool:
- `name`: The title of the section
- `description`: The scope of research you completed (brief, 1-2 sentences)
- `content`: The completed body of text for the section, which MUST:
- Begin with the section title formatted as "## [Section Title]" (H2 level with ##)
- Be formatted in Markdown style
- Be MAXIMUM 200 words (strictly enforce this limit)
- End with a "### Sources" subsection (H3 level with ###) containing a numbered list of URLs used
- Use clear, concise language with bullet points where appropriate
- Include relevant facts, statistics, or expert opinions
Example format for content:
[Section Title]
[Body text in markdown format, maximum 200 words…]
Sources
- [URL 1]
- [URL 2]
- [URL 3]
---
### Research Decision Framework
Before each search query or when writing the section, think through:
1. **What information do I already have?**
- Review all information gathered so far
- Identify the key insights and facts already discovered
2. **What information is still missing?**
- Identify specific gaps in knowledge relative to the section scope
- Prioritize the most important missing information
3. **What is the most effective next action?**
- Determine if another search is needed (and what specific aspect to search for)
- Or if enough information has been gathered to write a comprehensive section
---
### Notes:
- Focus on QUALITY over QUANTITY of searches
- Each search should have a clear, distinct purpose
- Do not write introductions or conclusions unless explicitly part of your section
- Keep a professional, factual tone
- Always follow markdown formatting
- Stay within the 200 word limit for the main content
"""
state.py
File Metadata:
- Path:
state.py
- Size: 2449 bytes
- Lines: 64
- Purpose: Pydantic state schemas and TypedDict definitions for LangGraph workflow persistence
from typing import Annotated, List, TypedDict, Literal
from pydantic import BaseModel, Field
import operator
class Section(BaseModel):
name: str = Field(
description="Name for this section of the report.",
)
description: str = Field(
description="Brief overview of the main topics and concepts to be covered in this section.",
)
research: bool = Field(
description="Whether to perform web research for this section of the report."
)
content: str = Field(
description="The content of the section."
)
class Sections(BaseModel):
sections: List[Section] = Field(
description="Sections of the report.",
)
class SearchQuery(BaseModel):
search_query: str = Field(None, description="Query for web search.")
class Queries(BaseModel):
queries: List[SearchQuery] = Field(
description="List of search queries.",
)
class Feedback(BaseModel):
grade: Literal["pass","fail"] = Field(
description="Evaluation result indicating whether the response meets requirements ('pass') or needs revision ('fail')."
)
follow_up_queries: List[SearchQuery] = Field(
description="List of follow-up search queries.",
)
class ReportStateInput(TypedDict):
topic: str # Report topic
class ReportStateOutput(TypedDict):
final_report: str # Final report
class ReportState(TypedDict):
topic: str # Report topic
feedback_on_report_plan: Annotated[list[str], operator.add] # List of feedback on the report plan
sections: list[Section] # List of report sections
completed_sections: Annotated[list, operator.add] # Send() API key
report_sections_from_research: str # String of any completed sections from research to write final sections
final_report: str # Final report
class SectionState(TypedDict):
topic: str # Report topic
section: Section # Report section
search_iterations: int # Number of search iterations done
search_queries: list[SearchQuery] # List of search queries
source_str: str # String of formatted source content from web search
report_sections_from_research: str # String of any completed sections from research to write final sections
completed_sections: list[Section] # Final key we duplicate in outer state for Send() API
class SectionOutputState(TypedDict):
completed_sections: list[Section] # Final key we duplicate in outer state for Send() API
utils.py
File Metadata:
- Path:
utils.py
- Size: 63345 bytes
- Lines: 1470
- Purpose: Multi-provider search API integrations with async execution and content deduplication
import os
import asyncio
import requests
import random
import concurrent
import aiohttp
import httpx
import time
from typing import List, Optional, Dict, Any, Union, Literal
from urllib.parse import unquote
from exa_py import Exa
from linkup import LinkupClient
from tavily import AsyncTavilyClient
from azure.core.credentials import AzureKeyCredential
from azure.search.documents.aio import SearchClient as AsyncAzureAISearchClient
import asyncio
import os
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from markdownify import markdownify
from langchain_community.retrievers import ArxivRetriever
from langchain_community.utilities.pubmed import PubMedAPIWrapper
from langchain_core.tools import tool
from langsmith import traceable
from open_deep_research.state import Section
def get_config_value(value):
"""
Helper function to handle string, dict, and enum cases of configuration values
"""
if isinstance(value, str):
return value
elif isinstance(value, dict):
return value
else:
return value.value
def get_search_params(search_api: str, search_api_config: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""
Filters the search_api_config dictionary to include only parameters accepted by the specified search API.
Args:
search_api (str): The search API identifier (e.g., "exa", "tavily").
search_api_config (Optional[Dict[str, Any]]): The configuration dictionary for the search API.
Returns:
Dict[str, Any]: A dictionary of parameters to pass to the search function.
"""
# Define accepted parameters for each search API
SEARCH_API_PARAMS = {
"exa": ["max_characters", "num_results", "include_domains", "exclude_domains", "subpages"],
"tavily": ["max_results", "topic"],
"perplexity": [], # Perplexity accepts no additional parameters
"arxiv": ["load_max_docs", "get_full_documents", "load_all_available_meta"],
"pubmed": ["top_k_results", "email", "api_key", "doc_content_chars_max"],
"linkup": ["depth"],
"googlesearch": ["max_results"],
}
# Get the list of accepted parameters for the given search API
accepted_params = SEARCH_API_PARAMS.get(search_api, [])
# If no config provided, return an empty dict
if not search_api_config:
return {}
# Filter the config to only include accepted parameters
return {k: v for k, v in search_api_config.items() if k in accepted_params}
def deduplicate_and_format_sources(search_response, max_tokens_per_source=5000, include_raw_content=True):
"""
Takes a list of search responses and formats them into a readable string.
Limits the raw_content to approximately max_tokens_per_source tokens.
Args:
search_responses: List of search response dicts, each containing:
- query: str
- results: List of dicts with fields:
- title: str
- url: str
- content: str
- score: float
- raw_content: str|None
max_tokens_per_source: int
include_raw_content: bool
Returns:
str: Formatted string with deduplicated sources
"""
# Collect all results
sources_list = []
for response in search_response:
sources_list.extend(response['results'])
# Deduplicate by URL
unique_sources = {source['url']: source for source in sources_list}
# Format output
formatted_text = "Content from sources:\n"
for i, source in enumerate(unique_sources.values(), 1):
formatted_text += f"{'='*80}\n" # Clear section separator
formatted_text += f"Source: {source['title']}\n"
formatted_text += f"{'-'*80}\n" # Subsection separator
formatted_text += f"URL: {source['url']}\n===\n"
formatted_text += f"Most relevant content from source: {source['content']}\n===\n"
if include_raw_content:
# Using rough estimate of 4 characters per token
char_limit = max_tokens_per_source * 4
# Handle None raw_content
raw_content = source.get('raw_content', '')
if raw_content is None:
raw_content = ''
print(f"Warning: No raw_content found for source {source['url']}")
if len(raw_content) > char_limit:
raw_content = raw_content[:char_limit] + "... [truncated]"
formatted_text += f"Full source content limited to {max_tokens_per_source} tokens: {raw_content}\n\n"
formatted_text += f"{'='*80}\n\n" # End section separator
return formatted_text.strip()
def format_sections(sections: list[Section]) -> str:
""" Format a list of sections into a string """
formatted_str = ""
for idx, section in enumerate(sections, 1):
formatted_str += f"""
{'='*60}
Section {idx}: {section.name}
{'='*60}
Description:
{section.description}
Requires Research:
{section.research}
Content:
{section.content if section.content else '[Not yet written]'}
"""
return formatted_str
@traceable
async def tavily_search_async(search_queries, max_results: int = 5, topic: Literal["general", "news", "finance"] = "general", include_raw_content: bool = True):
"""
Performs concurrent web searches with the Tavily API
Args:
search_queries (List[str]): List of search queries to process
max_results (int): Maximum number of results to return
topic (Literal["general", "news", "finance"]): Topic to filter results by
include_raw_content (bool): Whether to include raw content in the results
Returns:
List[dict]: List of search responses from Tavily API:
{
'query': str,
'follow_up_questions': None,
'answer': None,
'images': list,
'results': [ # List of search results
{
'title': str, # Title of the webpage
'url': str, # URL of the result
'content': str, # Summary/snippet of content
'score': float, # Relevance score
'raw_content': str|None # Full page content if available
},
...
]
}
"""
tavily_async_client = AsyncTavilyClient()
search_tasks = []
for query in search_queries:
search_tasks.append(
tavily_async_client.search(
query,
max_results=max_results,
include_raw_content=include_raw_content,
topic=topic
)
)
# Execute all searches concurrently
search_docs = await asyncio.gather(*search_tasks)
return search_docs
@traceable
async def azureaisearch_search_async(search_queries: list[str], max_results: int = 5, topic: str = "general", include_raw_content: bool = True) -> list[dict]:
"""
Performs concurrent web searches using the Azure AI Search API.
Args:
search_queries (List[str]): list of search queries to process
max_results (int): maximum number of results to return for each query
topic (str): semantic topic filter for the search.
include_raw_content (bool)
Returns:
List[dict]: list of search responses from Azure AI Search API, one per query.
"""
# configure and create the Azure Search client
# ensure all environment variables are set
if not all(var in os.environ for var in ["AZURE_AI_SEARCH_ENDPOINT", "AZURE_AI_SEARCH_INDEX_NAME", "AZURE_AI_SEARCH_API_KEY"]):
raise ValueError("Missing required environment variables for Azure Search API which are: AZURE_AI_SEARCH_ENDPOINT, AZURE_AI_SEARCH_INDEX_NAME, AZURE_AI_SEARCH_API_KEY")
endpoint = os.getenv("AZURE_AI_SEARCH_ENDPOINT")
index_name = os.getenv("AZURE_AI_SEARCH_INDEX_NAME")
credential = AzureKeyCredential(os.getenv("AZURE_AI_SEARCH_API_KEY"))
reranker_key = '@search.reranker_score'
async with AsyncAzureAISearchClient(endpoint, index_name, credential) as client:
async def do_search(query: str) -> dict:
# search query
paged = await client.search(
search_text=query,
vector_queries=[{
"fields": "vector",
"kind": "text",
"text": query,
"exhaustive": True
}],
semantic_configuration_name="fraunhofer-rag-semantic-config",
query_type="semantic",
select=["url", "title", "chunk", "creationTime", "lastModifiedTime"],
top=max_results,
)
# async iterator to get all results
items = [doc async for doc in paged]
# Umwandlung in einfaches Dict-Format
results = [
{
"title": doc.get("title"),
"url": doc.get("url"),
"content": doc.get("chunk"),
"score": doc.get(reranker_key),
"raw_content": doc.get("chunk") if include_raw_content else None
}
for doc in items
]
return {"query": query, "results": results}
# parallelize the search queries
tasks = [do_search(q) for q in search_queries]
return await asyncio.gather(*tasks)
@traceable
def perplexity_search(search_queries):
"""Search the web using the Perplexity API.
Args:
search_queries (List[SearchQuery]): List of search queries to process
Returns:
List[dict]: List of search responses from Perplexity API, one per query. Each response has format:
{
'query': str, # The original search query
'follow_up_questions': None,
'answer': None,
'images': list,
'results': [ # List of search results
{
'title': str, # Title of the search result
'url': str, # URL of the result
'content': str, # Summary/snippet of content
'score': float, # Relevance score
'raw_content': str|None # Full content or None for secondary citations
},
...
]
}
"""
headers = {
"accept": "application/json",
"content-type": "application/json",
"Authorization": f"Bearer {os.getenv('PERPLEXITY_API_KEY')}"
}
search_docs = []
for query in search_queries:
payload = {
"model": "sonar-pro",
"messages": [
{
"role": "system",
"content": "Search the web and provide factual information with sources."
},
{
"role": "user",
"content": query
}
]
}
response = requests.post(
"https://api.perplexity.ai/chat/completions",
headers=headers,
json=payload
)
response.raise_for_status() # Raise exception for bad status codes
# Parse the response
data = response.json()
content = data["choices"][0]["message"]["content"]
citations = data.get("citations", ["https://perplexity.ai"])
# Create results list for this query
results = []
# First citation gets the full content
results.append({
"title": f"Perplexity Search, Source 1",
"url": citations[0],
"content": content,
"raw_content": content,
"score": 1.0 # Adding score to match Tavily format
})
# Add additional citations without duplicating content
for i, citation in enumerate(citations[1:], start=2):
results.append({
"title": f"Perplexity Search, Source {i}",
"url": citation,
"content": "See primary source for full content",
"raw_content": None,
"score": 0.5 # Lower score for secondary sources
})
# Format response to match Tavily structure
search_docs.append({
"query": query,
"follow_up_questions": None,
"answer": None,
"images": [],
"results": results
})
return search_docs
@traceable
async def exa_search(search_queries, max_characters: Optional[int] = None, num_results=5,
include_domains: Optional[List[str]] = None,
exclude_domains: Optional[List[str]] = None,
subpages: Optional[int] = None):
"""Search the web using the Exa API.
Args:
search_queries (List[SearchQuery]): List of search queries to process
max_characters (int, optional): Maximum number of characters to retrieve for each result's raw content.
If None, the text parameter will be set to True instead of an object.
num_results (int): Number of search results per query. Defaults to 5.
include_domains (List[str], optional): List of domains to include in search results.
When specified, only results from these domains will be returned.
exclude_domains (List[str], optional): List of domains to exclude from search results.
Cannot be used together with include_domains.
subpages (int, optional): Number of subpages to retrieve per result. If None, subpages are not retrieved.
Returns:
List[dict]: List of search responses from Exa API, one per query. Each response has format:
{
'query': str, # The original search query
'follow_up_questions': None,
'answer': None,
'images': list,
'results': [ # List of search results
{
'title': str, # Title of the search result
'url': str, # URL of the result
'content': str, # Summary/snippet of content
'score': float, # Relevance score
'raw_content': str|None # Full content or None for secondary citations
},
...
]
}
"""
# Check that include_domains and exclude_domains are not both specified
if include_domains and exclude_domains:
raise ValueError("Cannot specify both include_domains and exclude_domains")
# Initialize Exa client (API key should be configured in your .env file)
exa = Exa(api_key = f"{os.getenv('EXA_API_KEY')}")
# Define the function to process a single query
async def process_query(query):
# Use run_in_executor to make the synchronous exa call in a non-blocking way
loop = asyncio.get_event_loop()
# Define the function for the executor with all parameters
def exa_search_fn():
# Build parameters dictionary
kwargs = {
# Set text to True if max_characters is None, otherwise use an object with max_characters
"text": True if max_characters is None else {"max_characters": max_characters},
"summary": True, # This is an amazing feature by EXA. It provides an AI generated summary of the content based on the query
"num_results": num_results
}
# Add optional parameters only if they are provided
if subpages is not None:
kwargs["subpages"] = subpages
if include_domains:
kwargs["include_domains"] = include_domains
elif exclude_domains:
kwargs["exclude_domains"] = exclude_domains
return exa.search_and_contents(query, **kwargs)
response = await loop.run_in_executor(None, exa_search_fn)
# Format the response to match the expected output structure
formatted_results = []
seen_urls = set() # Track URLs to avoid duplicates
# Helper function to safely get value regardless of if item is dict or object
def get_value(item, key, default=None):
if isinstance(item, dict):
return item.get(key, default)
else:
return getattr(item, key, default) if hasattr(item, key) else default
# Access the results from the SearchResponse object
results_list = get_value(response, 'results', [])
# First process all main results
for result in results_list:
# Get the score with a default of 0.0 if it's None or not present
score = get_value(result, 'score', 0.0)
# Combine summary and text for content if both are available
text_content = get_value(result, 'text', '')
summary_content = get_value(result, 'summary', '')
content = text_content
if summary_content:
if content:
content = f"{summary_content}\n\n{content}"
else:
content = summary_content
title = get_value(result, 'title', '')
url = get_value(result, 'url', '')
# Skip if we've seen this URL before (removes duplicate entries)
if url in seen_urls:
continue
seen_urls.add(url)
# Main result entry
result_entry = {
"title": title,
"url": url,
"content": content,
"score": score,
"raw_content": text_content
}
# Add the main result to the formatted results
formatted_results.append(result_entry)
# Now process subpages only if the subpages parameter was provided
if subpages is not None:
for result in results_list:
subpages_list = get_value(result, 'subpages', [])
for subpage in subpages_list:
# Get subpage score
subpage_score = get_value(subpage, 'score', 0.0)
# Combine summary and text for subpage content
subpage_text = get_value(subpage, 'text', '')
subpage_summary = get_value(subpage, 'summary', '')
subpage_content = subpage_text
if subpage_summary:
if subpage_content:
subpage_content = f"{subpage_summary}\n\n{subpage_content}"
else:
subpage_content = subpage_summary
subpage_url = get_value(subpage, 'url', '')
# Skip if we've seen this URL before
if subpage_url in seen_urls:
continue
seen_urls.add(subpage_url)
formatted_results.append({
"title": get_value(subpage, 'title', ''),
"url": subpage_url,
"content": subpage_content,
"score": subpage_score,
"raw_content": subpage_text
})
# Collect images if available (only from main results to avoid duplication)
images = []
for result in results_list:
image = get_value(result, 'image')
if image and image not in images: # Avoid duplicate images
images.append(image)
return {
"query": query,
"follow_up_questions": None,
"answer": None,
"images": images,
"results": formatted_results
}
# Process all queries sequentially with delay to respect rate limit
search_docs = []
for i, query in enumerate(search_queries):
try:
# Add delay between requests (0.25s = 4 requests per second, well within the 5/s limit)
if i > 0: # Don't delay the first request
await asyncio.sleep(0.25)
result = await process_query(query)
search_docs.append(result)
except Exception as e:
# Handle exceptions gracefully
print(f"Error processing query '{query}': {str(e)}")
# Add a placeholder result for failed queries to maintain index alignment
search_docs.append({
"query": query,
"follow_up_questions": None,
"answer": None,
"images": [],
"results": [],
"error": str(e)
})
# Add additional delay if we hit a rate limit error
if "429" in str(e):
print("Rate limit exceeded. Adding additional delay...")
await asyncio.sleep(1.0) # Add a longer delay if we hit a rate limit
return search_docs
@traceable
async def arxiv_search_async(search_queries, load_max_docs=5, get_full_documents=True, load_all_available_meta=True):
"""
Performs concurrent searches on arXiv using the ArxivRetriever.
Args:
search_queries (List[str]): List of search queries or article IDs
load_max_docs (int, optional): Maximum number of documents to return per query. Default is 5.
get_full_documents (bool, optional): Whether to fetch full text of documents. Default is True.
load_all_available_meta (bool, optional): Whether to load all available metadata. Default is True.
Returns:
List[dict]: List of search responses from arXiv, one per query. Each response has format:
{
'query': str, # The original search query
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [ # List of search results
{
'title': str, # Title of the paper
'url': str, # URL (Entry ID) of the paper
'content': str, # Formatted summary with metadata
'score': float, # Relevance score (approximated)
'raw_content': str|None # Full paper content if available
},
...
]
}
"""
async def process_single_query(query):
try:
# Create retriever for each query
retriever = ArxivRetriever(
load_max_docs=load_max_docs,
get_full_documents=get_full_documents,
load_all_available_meta=load_all_available_meta
)
# Run the synchronous retriever in a thread pool
loop = asyncio.get_event_loop()
docs = await loop.run_in_executor(None, lambda: retriever.invoke(query))
results = []
# Assign decreasing scores based on the order
base_score = 1.0
score_decrement = 1.0 / (len(docs) + 1) if docs else 0
for i, doc in enumerate(docs):
# Extract metadata
metadata = doc.metadata
# Use entry_id as the URL (this is the actual arxiv link)
url = metadata.get('entry_id', '')
# Format content with all useful metadata
content_parts = []
# Primary information
if 'Summary' in metadata:
content_parts.append(f"Summary: {metadata['Summary']}")
if 'Authors' in metadata:
content_parts.append(f"Authors: {metadata['Authors']}")
# Add publication information
published = metadata.get('Published')
published_str = published.isoformat() if hasattr(published, 'isoformat') else str(published) if published else ''
if published_str:
content_parts.append(f"Published: {published_str}")
# Add additional metadata if available
if 'primary_category' in metadata:
content_parts.append(f"Primary Category: {metadata['primary_category']}")
if 'categories' in metadata and metadata['categories']:
content_parts.append(f"Categories: {', '.join(metadata['categories'])}")
if 'comment' in metadata and metadata['comment']:
content_parts.append(f"Comment: {metadata['comment']}")
if 'journal_ref' in metadata and metadata['journal_ref']:
content_parts.append(f"Journal Reference: {metadata['journal_ref']}")
if 'doi' in metadata and metadata['doi']:
content_parts.append(f"DOI: {metadata['doi']}")
# Get PDF link if available in the links
pdf_link = ""
if 'links' in metadata and metadata['links']:
for link in metadata['links']:
if 'pdf' in link:
pdf_link = link
content_parts.append(f"PDF: {pdf_link}")
break
# Join all content parts with newlines
content = "\n".join(content_parts)
result = {
'title': metadata.get('Title', ''),
'url': url, # Using entry_id as the URL
'content': content,
'score': base_score - (i * score_decrement),
'raw_content': doc.page_content if get_full_documents else None
}
results.append(result)
return {
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': results
}
except Exception as e:
# Handle exceptions gracefully
print(f"Error processing arXiv query '{query}': {str(e)}")
return {
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [],
'error': str(e)
}
# Process queries sequentially with delay to respect arXiv rate limit (1 request per 3 seconds)
search_docs = []
for i, query in enumerate(search_queries):
try:
# Add delay between requests (3 seconds per ArXiv's rate limit)
if i > 0: # Don't delay the first request
await asyncio.sleep(3.0)
result = await process_single_query(query)
search_docs.append(result)
except Exception as e:
# Handle exceptions gracefully
print(f"Error processing arXiv query '{query}': {str(e)}")
search_docs.append({
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [],
'error': str(e)
})
# Add additional delay if we hit a rate limit error
if "429" in str(e) or "Too Many Requests" in str(e):
print("ArXiv rate limit exceeded. Adding additional delay...")
await asyncio.sleep(5.0) # Add a longer delay if we hit a rate limit
return search_docs
@traceable
async def pubmed_search_async(search_queries, top_k_results=5, email=None, api_key=None, doc_content_chars_max=4000):
"""
Performs concurrent searches on PubMed using the PubMedAPIWrapper.
Args:
search_queries (List[str]): List of search queries
top_k_results (int, optional): Maximum number of documents to return per query. Default is 5.
email (str, optional): Email address for PubMed API. Required by NCBI.
api_key (str, optional): API key for PubMed API for higher rate limits.
doc_content_chars_max (int, optional): Maximum characters for document content. Default is 4000.
Returns:
List[dict]: List of search responses from PubMed, one per query. Each response has format:
{
'query': str, # The original search query
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [ # List of search results
{
'title': str, # Title of the paper
'url': str, # URL to the paper on PubMed
'content': str, # Formatted summary with metadata
'score': float, # Relevance score (approximated)
'raw_content': str # Full abstract content
},
...
]
}
"""
async def process_single_query(query):
try:
# print(f"Processing PubMed query: '{query}'")
# Create PubMed wrapper for the query
wrapper = PubMedAPIWrapper(
top_k_results=top_k_results,
doc_content_chars_max=doc_content_chars_max,
email=email if email else "[email protected]",
api_key=api_key if api_key else ""
)
# Run the synchronous wrapper in a thread pool
loop = asyncio.get_event_loop()
# Use wrapper.lazy_load instead of load to get better visibility
docs = await loop.run_in_executor(None, lambda: list(wrapper.lazy_load(query)))
print(f"Query '{query}' returned {len(docs)} results")
results = []
# Assign decreasing scores based on the order
base_score = 1.0
score_decrement = 1.0 / (len(docs) + 1) if docs else 0
for i, doc in enumerate(docs):
# Format content with metadata
content_parts = []
if doc.get('Published'):
content_parts.append(f"Published: {doc['Published']}")
if doc.get('Copyright Information'):
content_parts.append(f"Copyright Information: {doc['Copyright Information']}")
if doc.get('Summary'):
content_parts.append(f"Summary: {doc['Summary']}")
# Generate PubMed URL from the article UID
uid = doc.get('uid', '')
url = f"https://pubmed.ncbi.nlm.nih.gov/{uid}/" if uid else ""
# Join all content parts with newlines
content = "\n".join(content_parts)
result = {
'title': doc.get('Title', ''),
'url': url,
'content': content,
'score': base_score - (i * score_decrement),
'raw_content': doc.get('Summary', '')
}
results.append(result)
return {
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': results
}
except Exception as e:
# Handle exceptions with more detailed information
error_msg = f"Error processing PubMed query '{query}': {str(e)}"
print(error_msg)
import traceback
print(traceback.format_exc()) # Print full traceback for debugging
return {
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [],
'error': str(e)
}
# Process all queries with a reasonable delay between them
search_docs = []
# Start with a small delay that increases if we encounter rate limiting
delay = 1.0 # Start with a more conservative delay
for i, query in enumerate(search_queries):
try:
# Add delay between requests
if i > 0: # Don't delay the first request
# print(f"Waiting {delay} seconds before next query...")
await asyncio.sleep(delay)
result = await process_single_query(query)
search_docs.append(result)
# If query was successful with results, we can slightly reduce delay (but not below minimum)
if result.get('results') and len(result['results']) > 0:
delay = max(0.5, delay * 0.9) # Don't go below 0.5 seconds
except Exception as e:
# Handle exceptions gracefully
error_msg = f"Error in main loop processing PubMed query '{query}': {str(e)}"
print(error_msg)
search_docs.append({
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [],
'error': str(e)
})
# If we hit an exception, increase delay for next query
delay = min(5.0, delay * 1.5) # Don't exceed 5 seconds
return search_docs
@traceable
async def linkup_search(search_queries, depth: Optional[str] = "standard"):
"""
Performs concurrent web searches using the Linkup API.
Args:
search_queries (List[SearchQuery]): List of search queries to process
depth (str, optional): "standard" (default) or "deep". More details here https://docs.linkup.so/pages/documentation/get-started/concepts
Returns:
List[dict]: List of search responses from Linkup API, one per query. Each response has format:
{
'results': [ # List of search results
{
'title': str, # Title of the search result
'url': str, # URL of the result
'content': str, # Summary/snippet of content
},
...
]
}
"""
client = LinkupClient()
search_tasks = []
for query in search_queries:
search_tasks.append(
client.async_search(
query,
depth,
output_type="searchResults",
)
)
search_results = []
for response in await asyncio.gather(*search_tasks):
search_results.append(
{
"results": [
{"title": result.name, "url": result.url, "content": result.content}
for result in response.results
],
}
)
return search_results
@traceable
async def google_search_async(search_queries: Union[str, List[str]], max_results: int = 5, include_raw_content: bool = True):
"""
Performs concurrent web searches using Google.
Uses Google Custom Search API if environment variables are set, otherwise falls back to web scraping.
Args:
search_queries (List[str]): List of search queries to process
max_results (int): Maximum number of results to return per query
include_raw_content (bool): Whether to fetch full page content
Returns:
List[dict]: List of search responses from Google, one per query
"""
# Check for API credentials from environment variables
api_key = os.environ.get("GOOGLE_API_KEY")
cx = os.environ.get("GOOGLE_CX")
use_api = bool(api_key and cx)
# Handle case where search_queries is a single string
if isinstance(search_queries, str):
search_queries = [search_queries]
# Define user agent generator
def get_useragent():
"""Generates a random user agent string."""
lynx_version = f"Lynx/{random.randint(2, 3)}.{random.randint(8, 9)}.{random.randint(0, 2)}"
libwww_version = f"libwww-FM/{random.randint(2, 3)}.{random.randint(13, 15)}"
ssl_mm_version = f"SSL-MM/{random.randint(1, 2)}.{random.randint(3, 5)}"
openssl_version = f"OpenSSL/{random.randint(1, 3)}.{random.randint(0, 4)}.{random.randint(0, 9)}"
return f"{lynx_version} {libwww_version} {ssl_mm_version} {openssl_version}"
# Create executor for running synchronous operations
executor = None if use_api else concurrent.futures.ThreadPoolExecutor(max_workers=5)
# Use a semaphore to limit concurrent requests
semaphore = asyncio.Semaphore(5 if use_api else 2)
async def search_single_query(query):
async with semaphore:
try:
results = []
# API-based search
if use_api:
# The API returns up to 10 results per request
for start_index in range(1, max_results + 1, 10):
# Calculate how many results to request in this batch
num = min(10, max_results - (start_index - 1))
# Make request to Google Custom Search API
params = {
'q': query,
'key': api_key,
'cx': cx,
'start': start_index,
'num': num
}
print(f"Requesting {num} results for '{query}' from Google API...")
async with aiohttp.ClientSession() as session:
async with session.get('https://www.googleapis.com/customsearch/v1', params=params) as response:
if response.status != 200:
error_text = await response.text()
print(f"API error: {response.status}, {error_text}")
break
data = await response.json()
# Process search results
for item in data.get('items', []):
result = {
"title": item.get('title', ''),
"url": item.get('link', ''),
"content": item.get('snippet', ''),
"score": None,
"raw_content": item.get('snippet', '')
}
results.append(result)
# Respect API quota with a small delay
await asyncio.sleep(0.2)
# If we didn't get a full page of results, no need to request more
if not data.get('items') or len(data.get('items', [])) < num:
break
# Web scraping based search
else:
# Add delay between requests
await asyncio.sleep(0.5 + random.random() * 1.5)
print(f"Scraping Google for '{query}'...")
# Define scraping function
def google_search(query, max_results):
try:
lang = "en"
safe = "active"
start = 0
fetched_results = 0
fetched_links = set()
search_results = []
while fetched_results < max_results:
# Send request to Google
resp = requests.get(
url="https://www.google.com/search",
headers={
"User-Agent": get_useragent(),
"Accept": "*/*"
},
params={
"q": query,
"num": max_results + 2,
"hl": lang,
"start": start,
"safe": safe,
},
cookies = {
'CONSENT': 'PENDING+987', # Bypasses the consent page
'SOCS': 'CAESHAgBEhIaAB',
}
)
resp.raise_for_status()
# Parse results
soup = BeautifulSoup(resp.text, "html.parser")
result_block = soup.find_all("div", class_="ezO2md")
new_results = 0
for result in result_block:
link_tag = result.find("a", href=True)
title_tag = link_tag.find("span", class_="CVA68e") if link_tag else None
description_tag = result.find("span", class_="FrIlee")
if link_tag and title_tag and description_tag:
link = unquote(link_tag["href"].split("&")[0].replace("/url?q=", ""))
if link in fetched_links:
continue
fetched_links.add(link)
title = title_tag.text
description = description_tag.text
# Store result in the same format as the API results
search_results.append({
"title": title,
"url": link,
"content": description,
"score": None,
"raw_content": description
})
fetched_results += 1
new_results += 1
if fetched_results >= max_results:
break
if new_results == 0:
break
start += 10
time.sleep(1) # Delay between pages
return search_results
except Exception as e:
print(f"Error in Google search for '{query}': {str(e)}")
return []
# Execute search in thread pool
loop = asyncio.get_running_loop()
search_results = await loop.run_in_executor(
executor,
lambda: google_search(query, max_results)
)
# Process the results
results = search_results
# If requested, fetch full page content asynchronously (for both API and web scraping)
if include_raw_content and results:
content_semaphore = asyncio.Semaphore(3)
async with aiohttp.ClientSession() as session:
fetch_tasks = []
async def fetch_full_content(result):
async with content_semaphore:
url = result['url']
headers = {
'User-Agent': get_useragent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
}
try:
await asyncio.sleep(0.2 + random.random() * 0.6)
async with session.get(url, headers=headers, timeout=10) as response:
if response.status == 200:
# Check content type to handle binary files
content_type = response.headers.get('Content-Type', '').lower()
# Handle PDFs and other binary files
if 'application/pdf' in content_type or 'application/octet-stream' in content_type:
# For PDFs, indicate that content is binary and not parsed
result['raw_content'] = f"[Binary content: {content_type}. Content extraction not supported for this file type.]"
else:
try:
# Try to decode as UTF-8 with replacements for non-UTF8 characters
html = await response.text(errors='replace')
soup = BeautifulSoup(html, 'html.parser')
result['raw_content'] = soup.get_text()
except UnicodeDecodeError as ude:
# Fallback if we still have decoding issues
result['raw_content'] = f"[Could not decode content: {str(ude)}]"
except Exception as e:
print(f"Warning: Failed to fetch content for {url}: {str(e)}")
result['raw_content'] = f"[Error fetching content: {str(e)}]"
return result
for result in results:
fetch_tasks.append(fetch_full_content(result))
updated_results = await asyncio.gather(*fetch_tasks)
results = updated_results
print(f"Fetched full content for {len(results)} results")
return {
"query": query,
"follow_up_questions": None,
"answer": None,
"images": [],
"results": results
}
except Exception as e:
print(f"Error in Google search for query '{query}': {str(e)}")
return {
"query": query,
"follow_up_questions": None,
"answer": None,
"images": [],
"results": []
}
try:
# Create tasks for all search queries
search_tasks = [search_single_query(query) for query in search_queries]
# Execute all searches concurrently
search_results = await asyncio.gather(*search_tasks)
return search_results
finally:
# Only shut down executor if it was created
if executor:
executor.shutdown(wait=False)
async def scrape_pages(titles: List[str], urls: List[str]) -> str:
"""
Scrapes content from a list of URLs and formats it into a readable markdown document.
This function:
1. Takes a list of page titles and URLs
2. Makes asynchronous HTTP requests to each URL
3. Converts HTML content to markdown
4. Formats all content with clear source attribution
Args:
titles (List[str]): A list of page titles corresponding to each URL
urls (List[str]): A list of URLs to scrape content from
Returns:
str: A formatted string containing the full content of each page in markdown format,
with clear section dividers and source attribution
"""
# Create an async HTTP client
async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client:
pages = []
# Fetch each URL and convert to markdown
for url in urls:
try:
# Fetch the content
response = await client.get(url)
response.raise_for_status()
# Convert HTML to markdown if successful
if response.status_code == 200:
# Handle different content types
content_type = response.headers.get('Content-Type', '')
if 'text/html' in content_type:
# Convert HTML to markdown
markdown_content = markdownify(response.text)
pages.append(markdown_content)
else:
# For non-HTML content, just mention the content type
pages.append(f"Content type: {content_type} (not converted to markdown)")
else:
pages.append(f"Error: Received status code {response.status_code}")
except Exception as e:
# Handle any exceptions during fetch
pages.append(f"Error fetching URL: {str(e)}")
# Create formatted output
formatted_output = f"Search results: \n\n"
for i, (title, url, page) in enumerate(zip(titles, urls, pages)):
formatted_output += f"\n\n--- SOURCE {i+1}: {title} ---\n"
formatted_output += f"URL: {url}\n\n"
formatted_output += f"FULL CONTENT:\n {page}"
formatted_output += "\n\n" + "-" * 80 + "\n"
return formatted_output
@tool
async def duckduckgo_search(search_queries: List[str]):
"""Perform searches using DuckDuckGo with retry logic to handle rate limits
Args:
search_queries (List[str]): List of search queries to process
Returns:
List[dict]: List of search results
"""
async def process_single_query(query):
# Execute synchronous search in the event loop's thread pool
loop = asyncio.get_event_loop()
def perform_search():
max_retries = 3
retry_count = 0
backoff_factor = 2.0
last_exception = None
while retry_count <= max_retries:
try:
results = []
with DDGS() as ddgs:
# Change query slightly and add delay between retries
if retry_count > 0:
# Random delay with exponential backoff
delay = backoff_factor ** retry_count + random.random()
print(f"Retry {retry_count}/{max_retries} for query '{query}' after {delay:.2f}s delay")
time.sleep(delay)
# Add a random element to the query to bypass caching/rate limits
modifiers = ['about', 'info', 'guide', 'overview', 'details', 'explained']
modified_query = f"{query} {random.choice(modifiers)}"
else:
modified_query = query
# Execute search
ddg_results = list(ddgs.text(modified_query, max_results=5))
# Format results
for i, result in enumerate(ddg_results):
results.append({
'title': result.get('title', ''),
'url': result.get('href', ''),
'content': result.get('body', ''),
'score': 1.0 - (i * 0.1), # Simple scoring mechanism
'raw_content': result.get('body', '')
})
# Return successful results
return {
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': results
}
except Exception as e:
# Store the exception and retry
last_exception = e
retry_count += 1
print(f"DuckDuckGo search error: {str(e)}. Retrying {retry_count}/{max_retries}")
# If not a rate limit error, don't retry
if "Ratelimit" not in str(e) and retry_count >= 1:
print(f"Non-rate limit error, stopping retries: {str(e)}")
break
# If we reach here, all retries failed
print(f"All retries failed for query '{query}': {str(last_exception)}")
# Return empty results but with query info preserved
return {
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [],
'error': str(last_exception)
}
return await loop.run_in_executor(None, perform_search)
# Process queries with delay between them to reduce rate limiting
search_docs = []
urls = []
titles = []
for i, query in enumerate(search_queries):
# Add delay between queries (except first one)
if i > 0:
delay = 2.0 + random.random() * 2.0 # Random delay 2-4 seconds
await asyncio.sleep(delay)
# Process the query
result = await process_single_query(query)
search_docs.append(result)
# Safely extract URLs and titles from results, handling empty result cases
if result['results'] and len(result['results']) > 0:
for res in result['results']:
if 'url' in res and 'title' in res:
urls.append(res['url'])
titles.append(res['title'])
# If we got any valid URLs, scrape the pages
if urls:
return await scrape_pages(titles, urls)
else:
# Return a formatted error message if no valid URLs were found
return "No valid search results found. Please try different search queries or use a different search API."
@tool
async def tavily_search(queries: List[str], max_results: int = 5, topic: Literal["general", "news", "finance"] = "general") -> str:
"""
Fetches results from Tavily search API.
Args:
queries (List[str]): List of search queries
max_results (int): Maximum number of results to return
topic (Literal["general", "news", "finance"]): Topic to filter results by
Returns:
str: A formatted string of search results
"""
# Use tavily_search_async with include_raw_content=True to get content directly
search_results = await tavily_search_async(
queries,
max_results=5,
topic="general",
include_raw_content=True
)
# Format the search results directly using the raw_content already provided
formatted_output = f"Search results: \n\n"
# Deduplicate results by URL
unique_results = {}
for response in search_results:
for result in response['results']:
url = result['url']
if url not in unique_results:
unique_results[url] = result
# Format the unique results
for i, (url, result) in enumerate(unique_results.items()):
formatted_output += f"\n\n--- SOURCE {i+1}: {result['title']} ---\n"
formatted_output += f"URL: {url}\n\n"
formatted_output += f"SUMMARY:\n{result['content']}\n\n"
if result.get('raw_content'):
formatted_output += f"FULL CONTENT:\n{result['raw_content'][:30000]}" # Limit content size
formatted_output += "\n\n" + "-" * 80 + "\n"
if unique_results:
return formatted_output
else:
return "No valid search results found. Please try different search queries or use a different search API."
@tool
async def azureaisearch_search(queries: List[str], max_results: int = 5, topic: str = "general") -> str:
"""
Fetches results from Azure AI Search API.
Args:
queries (List[str]): List of search queries
Returns:
str: A formatted string of search results
"""
# Use azureaisearch_search_async with include_raw_content=True to get content directly
search_results = await azureaisearch_search_async(
queries,
max_results=max_results,
topic=topic,
include_raw_content=True
)
# Format the search results directly using the raw_content already provided
formatted_output = f"Search results: \n\n"
# Deduplicate results by URL
unique_results = {}
for response in search_results:
for result in response['results']:
url = result['url']
if url not in unique_results:
unique_results[url] = result
# Format the unique results
for i, (url, result) in enumerate(unique_results.items()):
formatted_output += f"\n\n--- SOURCE {i+1}: {result['title']} ---\n"
formatted_output += f"URL: {url}\n\n"
formatted_output += f"SUMMARY:\n{result['content']}\n\n"
if result.get('raw_content'):
formatted_output += f"FULL CONTENT:\n{result['raw_content'][:30000]}" # Limit content size
formatted_output += "\n\n" + "-" * 80 + "\n"
if unique_results:
return formatted_output
else:
return "No valid search results found. Please try different search queries or use a different search API."
async def select_and_execute_search(search_api: str, query_list: list[str], params_to_pass: dict) -> str:
"""Select and execute the appropriate search API.
Args:
search_api: Name of the search API to use
query_list: List of search queries to execute
params_to_pass: Parameters to pass to the search API
Returns:
Formatted string containing search results
Raises:
ValueError: If an unsupported search API is specified
"""
print(f"query_list: {query_list} params_to_pass: {params_to_pass}")
if search_api == "tavily":
# Tavily search tool used with both workflow and agent
return await tavily_search.ainvoke({'queries': query_list}, **params_to_pass)
elif search_api == "duckduckgo":
# DuckDuckGo search tool used with both workflow and agent
return await duckduckgo_search.ainvoke({'search_queries': query_list})
elif search_api == "perplexity":
search_results = perplexity_search(query_list, **params_to_pass)
return deduplicate_and_format_sources(search_results, max_tokens_per_source=4000)
elif search_api == "exa":
search_results = await exa_search(query_list, **params_to_pass)
return deduplicate_and_format_sources(search_results, max_tokens_per_source=4000)
elif search_api == "arxiv":
search_results = await arxiv_search_async(query_list, **params_to_pass)
return deduplicate_and_format_sources(search_results, max_tokens_per_source=4000)
elif search_api == "pubmed":
search_results = await pubmed_search_async(query_list, **params_to_pass)
return deduplicate_and_format_sources(search_results, max_tokens_per_source=4000)
elif search_api == "linkup":
search_results = await linkup_search(query_list, **params_to_pass)
return deduplicate_and_format_sources(search_results, max_tokens_per_source=4000)
elif search_api == "googlesearch":
search_results = await google_search_async(query_list, **params_to_pass)
return deduplicate_and_format_sources(search_results, max_tokens_per_source=4000)
elif search_api == "azureaisearch":
#raise NotImplementedError("Azure AI Search is not implemented yet.")
search_results = await azureaisearch_search_async(query_list, **params_to_pass)
return deduplicate_and_format_sources(search_results, max_tokens_per_source=4000)
else:
raise ValueError(f"Unsupported search API: {search_api}")
Summary
This compilation contains all 7 Python files from the open_deep_research
directory. The files collectively implement a comprehensive deep research platform that automates the process of planning, researching, and generating detailed reports on any topic.
The system provides:
- Intelligent Report Planning - Uses LLMs to generate structured report outlines with human feedback integration
- Multi-Source Web Research - Supports 8+ search APIs including Tavily, Perplexity, Exa, arXiv, PubMed, Google, DuckDuckGo, and Azure AI Search
- Parallel Section Processing - Employs LangGraph's Send API for concurrent research and writing of report sections
- Iterative Research Refinement - Automatically evaluates section quality and performs follow-up research when needed
- Multi-Agent Architecture - Includes both workflow-based and agent-based implementations for different use cases
- Advanced Configuration Management - Flexible configuration system supporting multiple LLM providers (OpenAI, Anthropic) and search APIs
- Comprehensive Utility Functions - Extensive search integration, content formatting, and source deduplication capabilities
The platform is built using LangGraph for workflow orchestration, supports both cloud-based and local LLM deployments, and is designed for scalable, high-quality research report generation across academic, business, and technical domains.
Total Code Size: 117,476 bytes across 2,827 lines of Python code.