Please join our community: https://discord.gg/qzArY9sMnr

Our website: https://tesslate.com

This is using the TframeX runtime (a looping agent that acts as a chatbot to run your flow):

import asyncio
import os
import json # For structured outputs from agents if needed
from typing import List, Dict, Any
from dotenv import load_dotenv
from tframex import TFrameXApp, OpenAIChatLLM, Message, Tool, LLMAgent, Flow, SequentialPattern
from tframex.patterns import ParallelPattern # If we want to process multiple articles in parallel

# --- Configuration & Setup ---
load_dotenv()

# 1. Configure your LLM (using environment variables)
# Ensure OPENAI_API_BASE, OPENAI_API_KEY, OPENAI_MODEL_NAME are in your .env
# Example for local Ollama:
# OPENAI_API_BASE="<http://localhost:11434/v1>"
# OPENAI_API_KEY="ollama"
# OPENAI_MODEL_NAME="llama3" # Or your preferred model

# For more complex tasks, you might want different LLMs for different agents
default_llm = OpenAIChatLLM(
    model_name=os.getenv("OPENAI_MODEL_NAME", "gpt-3.5-turbo"),
    api_base_url=os.getenv("OPENAI_API_BASE"),
    api_key=os.getenv("OPENAI_API_KEY")
)

# A more powerful LLM for synthesis/analysis
powerful_llm = OpenAIChatLLM(
    model_name=os.getenv("POWERFUL_MODEL_NAME", "gpt-4-turbo"), # Or your best available model
    api_base_url=os.getenv("OPENAI_API_BASE_POWERFUL", os.getenv("OPENAI_API_BASE")),
    api_key=os.getenv("OPENAI_API_KEY_POWERFUL", os.getenv("OPENAI_API_KEY"))
)

if not default_llm.api_base_url or (powerful_llm and not powerful_llm.api_base_url):
    print("Error: LLM API base URL not configured. Check .env or OpenAIChatLLM instantiation.")
    exit()

# 2. Initialize TFrameXApp
app = TFrameXApp(default_llm=default_llm)

# --- Tool Definitions ---

@app.tool(description="Searches the web for a given query and returns top N results (URL, title, snippet).")
async def web_search_tool(query: str, num_results: int = 3) -> List[Dict[str, str]]:
    """
    MOCK IMPLEMENTATION: In a real application, this would call a search API.
    Returns a list of dictionaries, each with "url", "title", and "snippet".
    """
    print(f"TOOL: web_search_tool called with query: '{query}', num_results: {num_results}")
    # Replace with actual API call (e.g., Tavily, SerpAPI, Google Search API)
    mock_results = {
        "benefits of AI in healthcare": [
            {"url": "<https://example.com/ai-healthcare-benefits>", "title": "AI Benefits in Healthcare", "snippet": "AI revolutionizes diagnostics, drug discovery, and personalized treatment..."},
            {"url": "<https://example.com/ai-drug-discovery>", "title": "AI Speeds Up Drug Discovery", "snippet": "Machine learning models analyze vast datasets to find potential drug candidates..."},
        ],
        "risks of AI in healthcare": [
            {"url": "<https://example.com/ai-healthcare-risks>", "title": "Risks of AI in Medicine", "snippet": "Data privacy, algorithmic bias, and job displacement are key concerns..."},
            {"url": "<https://example.com/ai-bias-healthcare>", "title": "Algorithmic Bias in AI Healthcare", "snippet": "AI models trained on biased data can perpetuate health disparities..."},
        ]
    }
    results = mock_results.get(query.lower(), [])
    return results[:num_results]

@app.tool(description="Fetches the main text content from a given URL.")
async def fetch_page_content_tool(url: str) -> str:
    """
    MOCK IMPLEMENTATION: In a real application, this would fetch and parse HTML.
    """
    print(f"TOOL: fetch_page_content_tool called for URL: {url}")
    # Replace with actual fetching (e.g., httpx) and parsing (e.g., BeautifulSoup, selectolax)
    mock_content = {
        "<https://example.com/ai-healthcare-benefits>": "Detailed content about AI benefits in healthcare including improved diagnostics, efficient drug discovery, and personalized patient care plans. It also touches upon robotic surgery.",
        "<https://example.com/ai-drug-discovery>": "In-depth look at how AI accelerates the drug discovery pipeline by analyzing molecular structures and predicting efficacy, significantly reducing time and cost.",
        "<https://example.com/ai-healthcare-risks>": "Comprehensive discussion on the ethical and practical risks of AI in healthcare, such as data security vulnerabilities, the potential for biased algorithms leading to unequal care, and the challenge of regulatory oversight.",
        "<https://example.com/ai-bias-healthcare>": "Focuses specifically on algorithmic bias, how it arises from unrepresentative datasets, and its impact on minority groups in healthcare settings, proposing mitigation strategies."
    }
    if "nonexistent" in url: # Simulate a failure
        return "Error: Could not fetch content from URL."
    return mock_content.get(url, f"Mock content for {url}. This page discusses various aspects of the topic.")

# --- Agent Definitions ---

@app.agent(
    name="QueryGeneratorAgent",
    description="Generates diverse and effective search queries based on a research topic.",
    system_prompt=(
        "You are a research assistant specializing in query formulation. "
        "Given a research topic: {research_topic}, generate 3-5 diverse and effective search engine queries "
        "to gather comprehensive information. Consider different angles, keywords, and types of information (e.g., benefits, risks, examples, statistics)."
        "Output ONLY a JSON list of strings, where each string is a search query. Example: [\\"query1\\", \\"query2\\"]"
    ),
    strip_think_tags=True
)
async def query_generator_agent_func(): pass

@app.agent(
    name="InformationRetrieverAgent",
    description="Uses web search and page fetching tools to gather relevant information based on search queries.",
    system_prompt=(
        "You are an Information Retriever. Your task is to use the 'web_search_tool' with the provided search queries "
        "to find relevant URLs, and then use the 'fetch_page_content_tool' for the most promising URLs (max 2 per query) to get their full content. "
        "Compile a list of objects, where each object contains the 'url' and its 'full_content'."
        "Prioritize credible sources and diverse perspectives if possible from snippets."
        "If content fetching fails for a URL, note it but try others."
        "Input will be a list of search queries (JSON string). Output a JSON list of {'url': str, 'full_content': str}."
        "Available tools: {available_tools_descriptions}"
    ),
    tools=["web_search_tool", "fetch_page_content_tool"],
    strip_think_tags=True
)
async def information_retriever_agent_func(): pass

@app.agent(
    name="InsightExtractorAgent",
    description="Extracts key insights, facts, arguments, and data points from provided text content relevant to a research topic.",
    system_prompt=(
        "You are an Insight Extractor. Given a list of articles (each with 'url' and 'full_content') and a research_topic: {research_topic}, "
        "carefully read each article's content. For each article, extract key insights, facts, main arguments, counter-arguments, "
        "and any notable data points or statistics relevant to the research topic."
        "Be concise and focus on the most important information. If an article is not relevant, state that."
        "Output a JSON list, where each item corresponds to an article and contains {'url': str, 'extracted_insights': str (a summary of insights from this article)}."
    ),
    llm=powerful_llm, # Use a more capable LLM for this
    strip_think_tags=True
)
async def insight_extractor_agent_func(): pass

@app.agent(
    name="ReportSynthesizerAgent",
    description="Synthesizes extracted insights into a comprehensive research report.",
    system_prompt=(
        "You are a Research Report Writer. You will receive a list of extracted insights from various sources, each tagged by its URL, "
        "all related to the research topic: {research_topic}. Your task is to synthesize this information into a comprehensive and coherent "
        "research report. The report should include:\\n"
        "1. Introduction: Briefly introduce the topic and the scope of the report.\\n"
        "2. Key Findings: Present the main insights, facts, and arguments derived from the sources. Organize them thematically.\\n"
        "3. Analysis & Discussion: (If applicable) Discuss any conflicting information, identify patterns, or offer brief analysis.\\n"
        "4. Conclusion: Summarize the key takeaways.\\n"
        "5. Potential Gaps/Further Research: Briefly mention any areas that seem underexplored or warrant further investigation.\\n"
        "Ensure the report is well-structured, clear, and objective. Cite information by mentioning general themes from sources rather than direct URL citations in the final text for readability."
        "You are generating the *final report content*."
    ),
    llm=powerful_llm,
    strip_think_tags=True
)
async def report_synthesizer_agent_func(): pass

# --- Flow Definition (Sequential Approach) ---
# For more complex scenarios, a Supervisor Agent using Agent-as-Tool would be even better.
# This sequential flow is easier to start with.

deep_research_flow = Flow(
    flow_name="DeepResearchFlow",
    description="Orchestrates agents to perform deep research on a topic."
)
# The output of one agent becomes the input message (content) for the next.
# TFrameX handles message passing. We need to ensure agents are prompted
# to expect input from the previous step and produce output for the next.
# We will use `template_vars` to pass the `research_topic` throughout the flow.

deep_research_flow.add_step("QueryGeneratorAgent")       # Input: research_topic (via template_vars), Output: JSON list of queries
deep_research_flow.add_step("InformationRetrieverAgent") # Input: JSON list of queries, Output: JSON list of {url, full_content}
deep_research_flow.add_step("InsightExtractorAgent")     # Input: JSON list of {url, full_content}, Output: JSON list of {url, extracted_insights}
deep_research_flow.add_step("ReportSynthesizerAgent")    # Input: JSON list of {url, extracted_insights}, Output: Final research report

app.register_flow(deep_research_flow)

# --- Main Execution Logic ---
async def main():
    research_topic = "The impact of AI on healthcare: benefits and risks"
    print(f"--- Starting Deep Research Flow for topic: '{research_topic}' ---")

    async with app.run_context() as rt:
        initial_message = Message(
            role="user",
            # The initial message content can be minimal if the first agent primarily uses template_vars
            # Or, it could be the research topic itself if the agent is designed to parse it from user content.
            # For QueryGeneratorAgent, the system prompt uses {research_topic} from template_vars.
            content=f"Please start the research process for the topic: {research_topic}"
        )

        flow_context = await rt.run_flow(
            "DeepResearchFlow",
            initial_message,
            # `flow_template_vars` makes these variables available to all agents in the flow
            # if their system prompts include the corresponding placeholders.
            flow_template_vars={"research_topic": research_topic}
        )

        final_report = flow_context.current_message.content
        print("\\n\\n--- Generated Research Report ---")
        print(final_report)

        print("\\n\\n--- Flow Execution History (Simplified) ---")
        for i, msg_record in enumerate(flow_context.history):
            print(f"Step {i}: Role: {msg_record.message.role}, Agent: {msg_record.agent_name or 'FlowInput'}")
            if msg_record.message.tool_calls:
                print(f"  Tool Calls: {[tc.function.name for tc in msg_record.message.tool_calls]}")
            # Truncate long content for display
            content_preview = (msg_record.message.content[:200] + '...' if len(msg_record.message.content) > 200 else msg_record.message.content).replace('\\n', ' ')
            print(f"  Content: {content_preview}\\n")

if __name__ == "__main__":
    asyncio.run(main())

And here we have a FastAPI example:

import asyncio
import os
import json
import logging
from typing import List, Dict, Any

from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Body
from fastapi.middleware.cors import CORSMiddleware # For allowing frontend access
from pydantic import BaseModel

from tframex import TFrameXApp, OpenAIChatLLM, Message, Tool, LLMAgent, Flow

# --- Configuration & Setup ---
load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("DeepResearchChatbot")

# 1. Configure LLMs (Global, will be used by TFrameXApp)
default_llm = OpenAIChatLLM(
    model_name=os.getenv("OPENAI_MODEL_NAME", "gpt-3.5-turbo"),
    api_base_url=os.getenv("OPENAI_API_BASE"),
    api_key=os.getenv("OPENAI_API_KEY")
)

powerful_llm = OpenAIChatLLM(
    model_name=os.getenv("POWERFUL_MODEL_NAME", "gpt-4-turbo"),
    api_base_url=os.getenv("OPENAI_API_BASE_POWERFUL", os.getenv("OPENAI_API_BASE")),
    api_key=os.getenv("OPENAI_API_KEY_POWERFUL", os.getenv("OPENAI_API_KEY"))
)

# Global TFrameXApp instance
# This will be initialized once when the FastAPI app starts
tframex_app: TFrameXApp = None

def initialize_tframex_app():
    """Initializes the global TFrameX app with agents, tools, and flows."""
    global tframex_app
    if tframex_app is not None:
        return

    if not default_llm.api_base_url or (powerful_llm and not powerful_llm.api_base_url):
        logger.error("LLM API base URL not configured. Check .env. Exiting.")
        raise RuntimeError("LLM Configuration Error")

    tframex_app = TFrameXApp(default_llm=default_llm)
    logger.info("TFrameXApp initialized.")

    # --- Tool Definitions (Registered with the global tframex_app) ---
    @tframex_app.tool(description="Searches the web for a given query and returns top N results (URL, title, snippet).")
    async def web_search_tool(query: str, num_results: int = 3) -> List[Dict[str, str]]:
        logger.info(f"TOOL: web_search_tool called with query: '{query}', num_results: {num_results}")
        # MOCK IMPLEMENTATION (Replace with actual API call)
        mock_results = {
            "benefits of AI in healthcare": [
                {"url": "<https://example.com/ai-healthcare-benefits>", "title": "AI Benefits in Healthcare", "snippet": "AI revolutionizes diagnostics..."},
                {"url": "<https://example.com/ai-drug-discovery>", "title": "AI Speeds Up Drug Discovery", "snippet": "Machine learning models analyze..."},
            ],
            "risks of AI in healthcare": [
                {"url": "<https://example.com/ai-healthcare-risks>", "title": "Risks of AI in Medicine", "snippet": "Data privacy, algorithmic bias..."},
                {"url": "<https://example.com/ai-bias-healthcare>", "title": "Algorithmic Bias in AI Healthcare", "snippet": "AI models trained on biased data..."},
            ],
            "future of renewable energy": [
                {"url": "<https://example.com/renewable-future>", "title": "The Bright Future of Renewables", "snippet": "Solar, wind, and hydro power are set to dominate..."},
                {"url": "<https://example.com/energy-storage-solutions>", "title": "Solving Energy Storage", "snippet": "Battery technology and grid-scale storage are key..."},
            ]
        }
        results = mock_results.get(query.lower(), [])
        return results[:num_results]

    @tframex_app.tool(description="Fetches the main text content from a given URL.")
    async def fetch_page_content_tool(url: str) -> str:
        logger.info(f"TOOL: fetch_page_content_tool called for URL: {url}")
        # MOCK IMPLEMENTATION (Replace with actual fetching and parsing)
        mock_content = {
            "<https://example.com/ai-healthcare-benefits>": "Detailed content about AI benefits in healthcare...",
            "<https://example.com/ai-drug-discovery>": "In-depth look at how AI accelerates drug discovery...",
            "<https://example.com/ai-healthcare-risks>": "Comprehensive discussion on ethical and practical risks...",
            "<https://example.com/ai-bias-healthcare>": "Focuses specifically on algorithmic bias...",
            "<https://example.com/renewable-future>": "Content about solar, wind, hydro, and their future prospects...",
            "<https://example.com/energy-storage-solutions>": "Discussion of battery tech, pumped hydro, and other storage solutions...",
        }
        if "nonexistent" in url:
            return "Error: Could not fetch content from URL."
        return mock_content.get(url, f"Mock content for {url}.")

    # --- Agent Definitions (Registered with the global tframex_app) ---
    @tframex_app.agent(
        name="QueryGeneratorAgent",
        description="Generates diverse and effective search queries based on a research topic.",
        system_prompt=(
            "You are a research assistant specializing in query formulation. "
            "Given a research topic: {research_topic}, generate 3-5 diverse and effective search engine queries. "
            "Output ONLY a JSON list of strings. Example: [\\"query1\\", \\"query2\\"]"
        ),
        strip_think_tags=True
    )
    async def query_generator_agent_func(): pass

    @tframex_app.agent(
        name="InformationRetrieverAgent",
        description="Uses web search and page fetching tools to gather relevant information.",
        system_prompt=(
            "You are an Information Retriever. Use 'web_search_tool' with provided queries, then 'fetch_page_content_tool' for promising URLs (max 2 per query). "
            "Input: JSON list of search queries. Output: JSON list of {{'url': str, 'full_content': str}}. "
            "Available tools: {available_tools_descriptions}"
        ),
        tools=["web_search_tool", "fetch_page_content_tool"],
        strip_think_tags=True
    )
    async def information_retriever_agent_func(): pass

    @tframex_app.agent(
        name="InsightExtractorAgent",
        description="Extracts key insights from provided text content relevant to a research topic.",
        system_prompt=(
            "You are an Insight Extractor. Given articles (list of {{'url', 'full_content'}}) and research_topic: {research_topic}, "
            "extract key insights, facts, arguments relevant to the topic. "
            "Output a JSON list, where each item is {{'url': str, 'extracted_insights': str (summary of insights)}}."
        ),
        llm=powerful_llm,
        strip_think_tags=True
    )
    async def insight_extractor_agent_func(): pass

    @tframex_app.agent(
        name="ReportSynthesizerAgent",
        description="Synthesizes extracted insights into a comprehensive research report.",
        system_prompt=(
            "You are a Research Report Writer. Given extracted insights (list of {{'url', 'extracted_insights'}}) for research_topic: {research_topic}, "
            "synthesize them into a comprehensive report (Intro, Key Findings, Analysis, Conclusion, Gaps). "
            "You are generating the final report content."
        ),
        llm=powerful_llm,
        strip_think_tags=True
    )
    async def report_synthesizer_agent_func(): pass

    # --- Flow Definition (Registered with the global tframex_app) ---
    deep_research_flow = Flow(
        flow_name="DeepResearchFlow",
        description="Orchestrates agents to perform deep research on a topic."
    )
    deep_research_flow.add_step("QueryGeneratorAgent")
    deep_research_flow.add_step("InformationRetrieverAgent")
    deep_research_flow.add_step("InsightExtractorAgent")
    deep_research_flow.add_step("ReportSynthesizerAgent")
    tframex_app.register_flow(deep_research_flow)

    logger.info("TFrameX components (tools, agents, flow) registered.")

# --- FastAPI Application ---
api = FastAPI(
    title="TFrameX Deep Research Chatbot",
    description="An API to conduct deep research on a given topic using a multi-agent TFrameX flow.",
    version="0.1.0"
)

# CORS Middleware (adjust origins as needed for your frontend)
api.add_middleware(
    CORSMiddleware,
    allow_origins=["*"], # Allows all origins for simplicity, restrict in production
    allow_credentials=True,
    allow_methods=["*"], # Allows all methods
    allow_headers=["*"], # Allows all headers
)

# Pydantic models for request and response
class ResearchRequest(BaseModel):
    topic: str
    user_id: str = "default_user" # Optional: for tracking or context

class ResearchResponse(BaseModel):
    report: str
    status: str = "success"
    # You could add more details like a list of sources or intermediate logs if needed
    # sources_consulted: List[str] = []
    # execution_log: List[str] = []

@api.on_event("startup")
async def startup_event():
    """Initializes TFrameX app when FastAPI starts."""
    logger.info("FastAPI application starting up...")
    initialize_tframex_app()
    if tframex_app is None:
        logger.error("TFrameX App failed to initialize. API might not function correctly.")
        # Consider raising an exception here to prevent FastAPI from starting if TFrameX is critical
    else:
        logger.info("TFrameX App initialized successfully for FastAPI.")

@api.post("/research", response_model=ResearchResponse)
async def conduct_research(request: ResearchRequest):
    """
    Endpoint to initiate a deep research task on a given topic.
    """
    logger.info(f"Received research request for topic: '{request.topic}' from user: {request.user_id}")

    if tframex_app is None:
        logger.error("TFrameX app is not initialized. Cannot process request.")
        raise HTTPException(status_code=503, detail="Research service is temporarily unavailable. TFrameX not initialized.")

    try:
        async with tframex_app.run_context() as rt:
            # The initial message content can be minimal if the first agent primarily uses template_vars
            initial_message = Message(
                role="user",
                content=f"Initiate research on: {request.topic}"
            )

            logger.info(f"Running TFrameX 'DeepResearchFlow' for topic: {request.topic}")
            flow_context = await rt.run_flow(
                "DeepResearchFlow",
                initial_message,
                flow_template_vars={"research_topic": request.topic},
                # You could also pass user_id into shared_data if agents need it
                # initial_shared_data={"user_id": request.user_id}
            )

            final_report = flow_context.current_message.content
            logger.info(f"Research completed for topic: '{request.topic}'. Report length: {len(final_report)}")

            # For debugging, you might want to log the history
            # execution_log_summary = []
            # for i, msg_record in enumerate(flow_context.history):
            #     log_entry = f"Step {i}: Agent: {msg_record.agent_name or 'Input'}, Role: {msg_record.message.role}"
            #     if msg_record.message.tool_calls:
            #         log_entry += f", Tools: {[tc.function.name for tc in msg_record.message.tool_calls]}"
            #     execution_log_summary.append(log_entry)
            # logger.debug(f"Flow execution log for '{request.topic}': {execution_log_summary}")

            return ResearchResponse(report=final_report)

    except RuntimeError as e: # Catch specific errors you expect
        logger.error(f"Runtime error during research for topic '{request.topic}': {e}", exc_info=True)
        raise HTTPException(status_code=500, detail=f"A runtime error occurred: {str(e)}")
    except Exception as e:
        logger.error(f"Unexpected error during research for topic '{request.topic}': {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="An unexpected error occurred while processing your research request.")

@api.get("/")
async def root():
    return {"message": "TFrameX Deep Research Chatbot API is running. POST to /research with a 'topic'."}

# --- Main block for Uvicorn (if running this file directly) ---
if __name__ == "__main__":
    import uvicorn
    # Initialize TFrameX app before starting Uvicorn if not using startup event
    # initialize_tframex_app() # Can be redundant if startup_event is reliable

    host = os.getenv("HOST", "0.0.0.0")
    port = int(os.getenv("PORT", 8000))
    reload = os.getenv("RELOAD_APP", "false").lower() == "true"

    logger.info(f"Starting Uvicorn server on {host}:{port} with reload: {reload}")
    uvicorn.run("main:api", host=host, port=port, reload=reload)