From 52721bf96a5d3930d5f3b1eb3f57cce676e1334e Mon Sep 17 00:00:00 2001 From: Kaustav Ghosh Date: Wed, 22 Oct 2025 21:17:19 +0530 Subject: [PATCH] feat: implement router-based architecture for flight search agent - Introduced a router node to classify user queries and route them to specialized nodes for flight lookup, booking, viewing bookings, and searching airline reviews. - Added functions for extracting airport codes and booking details, enhancing the agent's ability to process user requests effectively. - Updated the flight search assistant prompt to reflect the new router-based architecture and improved task handling. - Enhanced logging throughout the workflow for better traceability and debugging. --- .gitignore | 2 +- couchbase-infrastructure | 1 + .../flight_search_agent_tutorial.ipynb | 481 +++++++++++++----- .../flight_search_agent_langraph/main.py | 355 +++++++++---- .../prompts/flight_search_assistant.yaml | 40 +- .../prompts/router_classifier.yaml | 44 ++ 6 files changed, 675 insertions(+), 248 deletions(-) create mode 160000 couchbase-infrastructure create mode 100644 notebooks/flight_search_agent_langraph/prompts/router_classifier.yaml diff --git a/.gitignore b/.gitignore index c7231bf..30ebccf 100644 --- a/.gitignore +++ b/.gitignore @@ -14,7 +14,7 @@ debug/ test_*.py test_*.ipynb debug_*.py -.mcp.json +.mcp.json.* # The quickstart folder .agent-catalog/ diff --git a/couchbase-infrastructure b/couchbase-infrastructure new file mode 160000 index 0000000..794fbb7 --- /dev/null +++ b/couchbase-infrastructure @@ -0,0 +1 @@ +Subproject commit 794fbb70998a8c033cefc16700bddd6206f17618 diff --git a/notebooks/flight_search_agent_langraph/flight_search_agent_tutorial.ipynb b/notebooks/flight_search_agent_langraph/flight_search_agent_tutorial.ipynb index b7e824b..6927f73 100644 --- a/notebooks/flight_search_agent_langraph/flight_search_agent_tutorial.ipynb +++ b/notebooks/flight_search_agent_langraph/flight_search_agent_tutorial.ipynb @@ -231,6 +231,53 @@ " logger.warning(f\"⚠️ Latest Capella AI LLM failed: {e}\")\n", " llm = None\n", "\n", + " # OPENAI FALLBACK\n", + " if not embeddings and os.getenv(\"OPENAI_API_KEY\"):\n", + " try:\n", + " if framework == \"llamaindex\":\n", + " from llama_index.embeddings.openai import OpenAIEmbedding\n", + " embeddings = OpenAIEmbedding(\n", + " api_key=os.getenv(\"OPENAI_API_KEY\"),\n", + " model_name=\"text-embedding-3-small\",\n", + " )\n", + " else: # langchain, langgraph\n", + " from langchain_openai import OpenAIEmbeddings\n", + " embeddings = OpenAIEmbeddings(\n", + " model=\"text-embedding-3-small\",\n", + " api_key=os.getenv(\"OPENAI_API_KEY\"),\n", + " base_url=os.getenv(\"OPENAI_API_ENDPOINT\"),\n", + " )\n", + " logger.info(\"\u2705 Using OpenAI embeddings fallback\")\n", + " except Exception as e:\n", + " logger.warning(f\"\u26a0\ufe0f OpenAI embeddings failed: {e}\")\n", + "\n", + " if not llm and os.getenv(\"OPENAI_API_KEY\"):\n", + " try:\n", + " if framework == \"llamaindex\":\n", + " from llama_index.llms.openai_like import OpenAILike\n", + " llm = OpenAILike(\n", + " model=\"gpt-4o\",\n", + " api_key=os.getenv(\"OPENAI_API_KEY\"),\n", + " is_chat_model=True,\n", + " temperature=temperature,\n", + " )\n", + " else: # langchain, langgraph\n", + " from langchain_openai import ChatOpenAI\n", + "\n", + " chat_kwargs = {\n", + " \"api_key\": os.getenv(\"OPENAI_API_KEY\"),\n", + " \"model\": \"gpt-4o\",\n", + " \"temperature\": temperature,\n", + " }\n", + " if callbacks:\n", + " chat_kwargs[\"callbacks\"] = callbacks\n", + "\n", + " llm = ChatOpenAI(**chat_kwargs)\n", + "\n", + " logger.info(\"\u2705 Using OpenAI LLM fallback\")\n", + " except Exception as e:\n", + " logger.warning(f\"\u26a0\ufe0f OpenAI LLM failed: {e}\")\n", + "\n", " # VALIDATION\n", " if not embeddings:\n", " raise ValueError(\"❌ No embeddings service could be initialized\")\n", @@ -580,6 +627,40 @@ " except Exception as e:\n", " raise RuntimeError(f\"❌ Error setting up LangChain vector store: {e!s}\")\n", "\n", + " def get_collection(self, scope_name: str, collection_name: str, auto_create: bool = False):\n", + " \"\"\"Get a collection object with optional auto-creation.\"\"\"\n", + " collection_key = f\"{scope_name}.{collection_name}\"\n", + " \n", + " if collection_key not in self._collections:\n", + " if auto_create:\n", + " self.setup_collection(scope_name, collection_name, clear_existing_data=False)\n", + " else:\n", + " if not self.bucket:\n", + " self.setup_bucket()\n", + " self._collections[collection_key] = self.bucket.scope(scope_name).collection(collection_name)\n", + " \n", + " return self._collections[collection_key]\n", + "\n", + " def disconnect(self):\n", + " \"\"\"Clean disconnect from Couchbase cluster.\"\"\"\n", + " try:\n", + " if self.cluster:\n", + " self._collections.clear()\n", + " self.bucket = None\n", + " self.cluster = None\n", + " logger.info(\"\u2705 Disconnected from Couchbase\")\n", + " except Exception as e:\n", + " logger.warning(f\"\u26a0\ufe0f Error during disconnect: {e}\")\n", + "\n", + " def __enter__(self):\n", + " \"\"\"Context manager entry - establish connection.\"\"\"\n", + " self.connect()\n", + " return self\n", + "\n", + " def __exit__(self, exc_type, exc_val, exc_tb):\n", + " \"\"\"Context manager exit - clean disconnect.\"\"\"\n", + " self.disconnect()\n", + "\n", "\n", "def create_couchbase_client(\n", " conn_string: str = None,\n", @@ -1332,7 +1413,7 @@ "outputs": [], "source": [ "\n", - "## Agent Classes\n", + "## Agent Classes and Router-Based Architecture\n", "\n", "class FlightSearchState(agentc_langgraph.agent.State):\n", " \"\"\"State for flight search conversations - single user system.\"\"\"\n", @@ -1340,49 +1421,41 @@ " query: str\n", " resolved: bool\n", " search_results: list[dict]\n", + " route_decision: str # Router's classification: 'lookup', 'book', 'view', 'reviews'\n", "\n", "\n", - "class FlightSearchAgent(agentc_langgraph.agent.ReActAgent):\n", - " \"\"\"Flight search agent using Agent Catalog tools and ReActAgent framework.\"\"\"\n", + "# ============================================================================\n", + "# Helper Functions for Parameter Extraction\n", + "# ============================================================================\n", "\n", - " def __init__(self, catalog: agentc.Catalog, span: agentc.Span, chat_model=None):\n", - " \"\"\"Initialize the flight search agent.\"\"\"\n", "\n", - " if chat_model is None:\n", - " # Fallback to OpenAI if no chat model provided\n", - " model_name = os.getenv(\"OPENAI_MODEL\", \"gpt-4o-mini\")\n", - " chat_model = langchain_openai.chat_models.ChatOpenAI(model=model_name, temperature=0.1)\n", + "def extract_airports(query: str) -> dict:\n", + " \"\"\"Extract source and destination airports from query using regex. Fails fast if not found.\"\"\"\n", + " import re\n", "\n", - " super().__init__(\n", - " chat_model=chat_model, catalog=catalog, span=span, prompt_name=\"flight_search_assistant\"\n", - " )\n", + " # ReAct-style logging for extraction\n", + " logger.info(\"Thought: I need to extract airport codes from the query using regex pattern matching\")\n", + " logger.info(\"Action: extract_airports (regex pattern: \\\\b([A-Z]{3})\\\\b)\")\n", + " logger.info(f\"Action Input: {query}\")\n", "\n", - " def _invoke(\n", - " self,\n", - " span: agentc.Span,\n", - " state: FlightSearchState,\n", - " config: langchain_core.runnables.RunnableConfig,\n", - " ) -> FlightSearchState:\n", - " \"\"\"Handle flight search conversation using ReActAgent.\"\"\"\n", - "\n", - " # Initialize conversation if this is the first message\n", - " if not state[\"messages\"]:\n", - " initial_msg = langchain_core.messages.HumanMessage(content=state[\"query\"])\n", - " state[\"messages\"].append(initial_msg)\n", - " logger.info(f\"Flight Query: {state['query']}\")\n", - "\n", - " # Get prompt resource first - we'll need it for the ReAct agent\n", - " prompt_resource = self.catalog.find(\"prompt\", name=\"flight_search_assistant\")\n", - "\n", - " # Get tools from Agent Catalog with simplified discovery\n", - " tools = []\n", - " tool_names = [\n", - " \"lookup_flight_info\",\n", - " \"save_flight_booking\",\n", - " \"retrieve_flight_bookings\",\n", - " \"search_airline_reviews\",\n", - " ]\n", + " # Extract 3-letter airport codes (e.g., 'JFK to LAX', 'from JFK to LAX')\n", + " airport_pattern = r'\\b([A-Z]{3})\\b'\n", + " airports = re.findall(airport_pattern, query.upper())\n", + "\n", + " if len(airports) >= 2:\n", + " result = {'source': airports[0], 'dest': airports[1]}\n", + " logger.info(f\"Observation: Successfully extracted - source_airport: {result['source']}, destination_airport: {result['dest']}\")\n", + " return result\n", "\n", + " # Fail fast - no fallbacks\n", + " logger.error(f\"Observation: Failed to extract airport codes from query\")\n", + " raise ValueError(\n", + " f\"Could not extract airport codes from query: '{query}'. \"\n", + " f\"Please provide clear 3-letter airport codes (e.g., 'JFK to LAX' or 'Find flights from JFK to LAX')\"\n", + " )\n", + "\n", + "\n", +<<<<<<< Updated upstream " for tool_name in tool_names:\n", " try:\n", " # Find tool using Agent Catalog\n", @@ -1396,39 +1469,47 @@ " except Exception as e:\n", " logger.error(f\"❌ Failed to find tool {tool_name}: {e}\")\n", " continue\n", +======= + "def extract_booking_details(query: str) -> str:\n", + " \"\"\"Extract booking details from natural language and format for tool.\"\"\"\n", + " # The save_flight_booking tool already handles natural language well\n", + " # Just pass the query as-is, it will extract what it needs\n", + " return query\n", + "\n", +>>>>>>> Stashed changes "\n", - " # Create wrapper function to handle proper parameter parsing\n", - " def create_tool_wrapper(original_tool, name):\n", - " \"\"\"Create a wrapper for Agent Catalog tools with robust input handling.\"\"\"\n", + "# ============================================================================\n", + "# Router Node - Intent Classification\n", + "# ============================================================================\n", "\n", +<<<<<<< Updated upstream " def wrapper_func(tool_input: str) -> str:\n", " \"\"\"Wrapper function that handles input parsing and error handling.\"\"\"\n", " try:\n", " logger.info(f\"🔧 Tool {name} called with raw input: {repr(tool_input)}\")\n", +======= +>>>>>>> Stashed changes "\n", - " # Robust input sanitization to handle ReAct format artifacts\n", - " if isinstance(tool_input, str):\n", - " # Remove ReAct format artifacts that get mixed into input\n", - " clean_input = tool_input.strip()\n", + "def create_router_node(llm, catalog: agentc.Catalog):\n", + " \"\"\"Create a router node function using Agent Catalog prompt.\"\"\"\n", "\n", - " # Remove common ReAct artifacts\n", - " artifacts_to_remove = [\n", - " '\\nObservation', 'Observation', '\\nThought:', 'Thought:',\n", - " '\\nAction:', 'Action:', '\\nAction Input:', 'Action Input:',\n", - " '\\nFinal Answer:', 'Final Answer:'\n", - " ]\n", + " def router_node(state: FlightSearchState) -> FlightSearchState:\n", + " \"\"\"Classify user intent and set routing decision.\"\"\"\n", "\n", - " for artifact in artifacts_to_remove:\n", - " if artifact in clean_input:\n", - " clean_input = clean_input.split(artifact)[0]\n", + " # ReAct-style logging: Router classification\n", + " logger.info(\"Thought: I need to classify this query to route it to the correct specialized handler\")\n", + " logger.info(\"Action: router_classifier\")\n", + " logger.info(f\"Action Input: {state['query']}\")\n", "\n", - " # Clean up quotes and whitespace\n", - " clean_input = clean_input.strip().strip(\"\\\"'\").strip()\n", - " # Normalize whitespace\n", - " clean_input = \" \".join(clean_input.split())\n", + " # Load classification prompt from Agent Catalog\n", + " prompt_resource = catalog.find(\"prompt\", name=\"router_classifier\")\n", + " classification_prompt = prompt_resource.content\n", "\n", - " tool_input = clean_input\n", + " # Invoke LLM for classification (state is a dict in LangGraph)\n", + " response = llm.invoke(classification_prompt.format(query=state[\"query\"]))\n", + " decision = response.content.strip().lower()\n", "\n", +<<<<<<< Updated upstream " logger.info(f\"🧹 Tool {name} cleaned input: {repr(tool_input)}\")\n", "\n", " # Call appropriate tool with proper parameter handling\n", @@ -1510,74 +1591,174 @@ " name=tool_name,\n", " description=tool_descriptions.get(tool_name, f\"Tool for {tool_name.replace('_', ' ')}\"),\n", " func=create_tool_wrapper(catalog_tool, tool_name),\n", +======= + " # Validate decision - fail fast if invalid\n", + " valid_categories = [\"lookup\", \"book\", \"view\", \"reviews\"]\n", + " if decision not in valid_categories:\n", + " raise ValueError(\n", + " f\"Router returned invalid classification: '{decision}'. \"\n", + " f\"Expected one of: {valid_categories}. \"\n", + " f\"Query was: '{state['query']}'\"\n", +>>>>>>> Stashed changes " )\n", - " tools.append(langchain_tool)\n", - "\n", - " # Use the Agent Catalog prompt content directly - get first result if it's a list\n", - " if isinstance(prompt_resource, list):\n", - " prompt_resource = prompt_resource[0]\n", - "\n", - " # Safely get the content from the prompt resource\n", - " prompt_content = getattr(prompt_resource, \"content\", \"\")\n", - " if not prompt_content:\n", - " prompt_content = \"You are a helpful flight search assistant. Use the available tools to help users with their flight queries.\"\n", - "\n", - " # Inject current date into the prompt content\n", - " import datetime\n", - "\n", - " current_date = datetime.date.today().strftime(\"%Y-%m-%d\")\n", - " prompt_content = prompt_content.replace(\"{current_date}\", current_date)\n", - "\n", - " # Use the Agent Catalog prompt content directly - it already has ReAct format\n", - " react_prompt = PromptTemplate.from_template(str(prompt_content))\n", - "\n", - " # Create ReAct agent with tools and prompt\n", - " agent = create_react_agent(self.chat_model, tools, react_prompt)\n", - "\n", - " # Custom parsing error handler - force stopping on parsing errors\n", - " def handle_parsing_errors(error):\n", - " \"\"\"Custom handler for parsing errors - force early termination.\"\"\"\n", - " error_msg = str(error)\n", - " if \"both a final answer and a parse-able action\" in error_msg:\n", - " # Force early termination - return a reasonable response\n", - " return \"Final Answer: I encountered a parsing error. Please reformulate your request.\"\n", - " elif \"Missing 'Action:'\" in error_msg:\n", - " return \"I need to use the correct format with Action: and Action Input:\"\n", - " else:\n", - " return f\"Final Answer: I encountered an error processing your request. Please try again.\"\n", - "\n", - " # Create agent executor - very strict: only 2 iterations max\n", - " agent_executor = AgentExecutor(\n", - " agent=agent,\n", - " tools=tools,\n", - " verbose=True,\n", - " handle_parsing_errors=handle_parsing_errors,\n", - " max_iterations=2, # STRICT: 1 tool call + 1 Final Answer only\n", - " early_stopping_method=\"force\", # Force stop\n", - " return_intermediate_steps=True,\n", + "\n", + " state[\"route_decision\"] = decision\n", + " logger.info(f\"Observation: Classified as '{decision}' (routing to {decision}_flights/bookings/reviews node)\")\n", + "\n", + " return state\n", + "\n", + " return router_node\n", + "\n", + "\n", + "# ============================================================================\n", + "# Specialized Node Functions - Direct Tool Calls\n", + "# ============================================================================\n", + "\n", + "\n", + "def create_lookup_flights_node(catalog: agentc.Catalog):\n", + " \"\"\"Create a node for looking up available flights.\"\"\"\n", + "\n", + " def lookup_flights_node(state: FlightSearchState) -> FlightSearchState:\n", + " \"\"\"Handle flight lookup queries with direct tool invocation. Fails fast on errors.\"\"\"\n", + " logger.info(f\"\u2708\ufe0f Lookup node processing: {state['query']}\")\n", + "\n", + " # Extract airports from query (fails fast if not found)\n", + " # This will log its own Thought/Action/Observation\n", + " airports = extract_airports(state[\"query\"])\n", + "\n", + " # ReAct-style logging: Tool invocation\n", + " logger.info(\"Thought: Now I'll search for available flights between these airports\")\n", + " logger.info(\"Action: lookup_flight_info\")\n", + " logger.info(f\"Action Input: source_airport={airports['source']}, destination_airport={airports['dest']}\")\n", + "\n", + " # Get tool and call directly via Agent Catalog\n", + " tool = catalog.find(\"tool\", name=\"lookup_flight_info\")\n", + " response = tool.func(\n", + " source_airport=airports[\"source\"],\n", + " destination_airport=airports[\"dest\"]\n", " )\n", "\n", - " # Execute the agent\n", - " response = agent_executor.invoke({\"input\": state[\"query\"]})\n", - "\n", - " # Extract tool outputs from intermediate_steps and store in search_results\n", - " if \"intermediate_steps\" in response and response[\"intermediate_steps\"]:\n", - " tool_outputs = []\n", - " for step in response[\"intermediate_steps\"]:\n", - " if isinstance(step, tuple) and len(step) >= 2:\n", - " # step[0] is the action, step[1] is the tool output/observation\n", - " tool_output = str(step[1])\n", - " if tool_output and tool_output.strip():\n", - " tool_outputs.append(tool_output)\n", - " state[\"search_results\"] = tool_outputs\n", - "\n", - " # Add response to conversation\n", - " assistant_msg = langchain_core.messages.AIMessage(content=response[\"output\"])\n", - " state[\"messages\"].append(assistant_msg)\n", + " # Show full observation\n", + " logger.info(f\"Observation: {response}\")\n", + "\n", + " # Update state (state is a dict in LangGraph)\n", + " state[\"messages\"].append(langchain_core.messages.AIMessage(content=response))\n", + " state[\"search_results\"] = [response]\n", " state[\"resolved\"] = True\n", + " logger.info(\"\u2705 Lookup node completed successfully\")\n", "\n", " return state\n", "\n", + " return lookup_flights_node\n", + "\n", + "\n", + "def create_book_flight_node(catalog: agentc.Catalog):\n", + " \"\"\"Create a node for booking flights.\"\"\"\n", + "\n", + " def book_flight_node(state: FlightSearchState) -> FlightSearchState:\n", + " \"\"\"Handle flight booking queries with direct tool invocation. Fails fast on errors.\"\"\"\n", + " logger.info(f\"\ud83d\udcdd Book node processing: {state['query']}\")\n", + "\n", + " # ReAct-style logging: Extraction\n", + " logger.info(\"Thought: I need to extract booking details from the query\")\n", + " logger.info(\"Action: extract_booking_details\")\n", + " logger.info(f\"Action Input: {state['query']}\")\n", + "\n", + " # Extract booking details\n", + " booking_input = extract_booking_details(state[\"query\"])\n", + " logger.info(f\"Observation: Extracted booking details: {booking_input}\")\n", + "\n", + " # ReAct-style logging: Tool invocation\n", + " logger.info(\"Thought: Now I'll create the flight booking with these details\")\n", + " logger.info(\"Action: save_flight_booking\")\n", + " logger.info(f\"Action Input: {booking_input}\")\n", + "\n", + " # Get tool and call directly via Agent Catalog\n", + " tool = catalog.find(\"tool\", name=\"save_flight_booking\")\n", + " response = tool.func(booking_input=booking_input)\n", + "\n", + " # Show full observation\n", + " logger.info(f\"Observation: {response}\")\n", + "\n", + " # Update state (state is a dict in LangGraph)\n", + " state[\"messages\"].append(langchain_core.messages.AIMessage(content=response))\n", + " state[\"search_results\"] = [response]\n", + " state[\"resolved\"] = True\n", + " logger.info(\"\u2705 Book node completed successfully\")\n", + "\n", + " return state\n", + "\n", + " return book_flight_node\n", + "\n", + "\n", + "def create_view_bookings_node(catalog: agentc.Catalog):\n", + " \"\"\"Create a node for viewing existing bookings.\"\"\"\n", + "\n", + " def view_bookings_node(state: FlightSearchState) -> FlightSearchState:\n", + " \"\"\"Handle view bookings queries with direct tool invocation. Fails fast on errors.\"\"\"\n", + " logger.info(f\"\ud83d\udc40 View node processing: {state['query']}\")\n", + "\n", + " # ReAct-style logging: Tool invocation\n", + " logger.info(\"Thought: I'll retrieve all current flight bookings for the user\")\n", + " logger.info(\"Action: retrieve_flight_bookings\")\n", + " logger.info(\"Action Input: booking_query='' (empty string to get all bookings)\")\n", + "\n", + " # Get tool and call with empty query to get all bookings via Agent Catalog\n", + " tool = catalog.find(\"tool\", name=\"retrieve_flight_bookings\")\n", + " response = tool.func(booking_query=\"\")\n", + "\n", + " # Show full observation\n", + " logger.info(f\"Observation: {response}\")\n", + "\n", + " # Update state (state is a dict in LangGraph)\n", + " state[\"messages\"].append(langchain_core.messages.AIMessage(content=response))\n", + " state[\"search_results\"] = [response]\n", + " state[\"resolved\"] = True\n", + " logger.info(\"\u2705 View node completed successfully\")\n", + "\n", + " return state\n", + "\n", + " return view_bookings_node\n", + "\n", + "\n", + "def create_search_reviews_node(catalog: agentc.Catalog):\n", + " \"\"\"Create a node for searching airline reviews.\"\"\"\n", + "\n", + " def search_reviews_node(state: FlightSearchState) -> FlightSearchState:\n", + " \"\"\"Handle airline review search queries with direct tool invocation. Fails fast on errors.\"\"\"\n", + " logger.info(f\"\u2b50 Reviews node processing: {state['query']}\")\n", + "\n", + " # Use the query as-is for searching reviews\n", + " # The tool expects natural language like 'SpiceJet service quality'\n", + " search_query = state[\"query\"]\n", + "\n", + " # ReAct-style logging: Tool invocation\n", + " logger.info(\"Thought: I'll search for airline reviews using vector similarity search\")\n", + " logger.info(\"Action: search_airline_reviews\")\n", + " logger.info(f\"Action Input: query='{search_query}'\")\n", + "\n", + " # Get tool and call directly via Agent Catalog\n", + " tool = catalog.find(\"tool\", name=\"search_airline_reviews\")\n", + " response = tool.func(query=search_query)\n", + "\n", + " # Show full observation\n", + " logger.info(f\"Observation: {response}\")\n", + "\n", + " # Update state (state is a dict in LangGraph)\n", + " state[\"messages\"].append(langchain_core.messages.AIMessage(content=response))\n", + " state[\"search_results\"] = [response]\n", + " state[\"resolved\"] = True\n", + " logger.info(\"\u2705 Reviews node completed successfully\")\n", + "\n", + " return state\n", + "\n", + " return search_reviews_node\n", + "\n", + "\n", + "# ============================================================================\n", + "# FlightSearchGraph - Router-Based Architecture\n", + "# ============================================================================\n", + "\n", "\n", "class FlightSearchGraph(agentc_langgraph.graph.GraphRunnable):\n", " \"\"\"Flight search conversation graph using Agent Catalog.\"\"\"\n", @@ -1595,36 +1776,58 @@ " query=query,\n", " resolved=False,\n", " search_results=[],\n", + " route_decision=\\\"\\\", # Will be set by router\n", " )\n", "\n", " def compile(self):\n", - " \"\"\"Compile the LangGraph workflow.\"\"\"\n", + " \"\"\"Compile the LangGraph workflow with router-based architecture.\"\"\"\n", "\n", - " # Build the flight search agent with catalog integration\n", - " search_agent = FlightSearchAgent(\n", - " catalog=self.catalog, span=self.span, chat_model=self.chat_model\n", - " )\n", + " # Create specialized node functions using Agent Catalog\n", + " router = create_router_node(self.chat_model, self.catalog)\n", + " lookup_node = create_lookup_flights_node(self.catalog)\n", + " book_node = create_book_flight_node(self.catalog)\n", + " view_node = create_view_bookings_node(self.catalog)\n", + " reviews_node = create_search_reviews_node(self.catalog)\n", "\n", - " # Create a wrapper function for the ReActAgent\n", - " def flight_search_node(state: FlightSearchState) -> FlightSearchState:\n", - " \"\"\"Wrapper function for the flight search ReActAgent.\"\"\"\n", - " return search_agent._invoke(\n", - " span=self.span,\n", - " state=state,\n", - " config={}, # Empty config for now\n", - " )\n", + " # Define routing logic based on classification\n", + " def route_query(state: FlightSearchState) -> str:\n", + " \"\"\"Route to appropriate node based on classification.\"\"\"\n", + " return state[\"route_decision\"]\n", "\n", - " # Create a simple workflow graph for flight search\n", + " # Build the graph\n", " workflow = langgraph.graph.StateGraph(FlightSearchState)\n", "\n", - " # Add the flight search agent node using the wrapper function\n", - " workflow.add_node(\"flight_search\", flight_search_node)\n", + " # Add all nodes\n", + " workflow.add_node(\"router\", router)\n", + " workflow.add_node(\"lookup_flights\", lookup_node)\n", + " workflow.add_node(\"book_flight\", book_node)\n", + " workflow.add_node(\"view_bookings\", view_node)\n", + " workflow.add_node(\"search_reviews\", reviews_node)\n", + "\n", + " # Set entry point to router\n", + " workflow.set_entry_point(\"router\")\n", + "\n", + " # Add conditional edges from router to specialized nodes\n", + " workflow.add_conditional_edges(\n", + " \"router\",\n", + " route_query,\n", + " {\n", + " \"lookup\": \"lookup_flights\",\n", + " \"book\": \"book_flight\",\n", + " \"view\": \"view_bookings\",\n", + " \"reviews\": \"search_reviews\",\n", + " },\n", + " )\n", "\n", - " # Set entry point and simple flow\n", - " workflow.set_entry_point(\"flight_search\")\n", - " workflow.add_edge(\"flight_search\", langgraph.graph.END)\n", + " # All specialized nodes end after execution\n", + " workflow.add_edge(\"lookup_flights\", langgraph.graph.END)\n", + " workflow.add_edge(\"book_flight\", langgraph.graph.END)\n", + " workflow.add_edge(\"view_bookings\", langgraph.graph.END)\n", + " workflow.add_edge(\"search_reviews\", langgraph.graph.END)\n", "\n", - " return workflow.compile()\n" + " logger.info(\"✅ Router-based graph compiled successfully\")\n", + " return workflow.compile()\n", + "\n" ] }, { diff --git a/notebooks/flight_search_agent_langraph/main.py b/notebooks/flight_search_agent_langraph/main.py index 808880a..dd22df6 100644 --- a/notebooks/flight_search_agent_langraph/main.py +++ b/notebooks/flight_search_agent_langraph/main.py @@ -17,16 +17,11 @@ import agentc_langgraph.graph import dotenv import langchain_core.messages -import langchain_core.runnables -import langchain_openai.chat_models import langgraph.graph from couchbase.auth import PasswordAuthenticator from couchbase.cluster import Cluster from couchbase.exceptions import KeyspaceNotFoundException from couchbase.options import ClusterOptions -from langchain.agents import AgentExecutor, create_react_agent -from langchain_core.prompts import PromptTemplate -from langchain_core.tools import Tool from pydantic import SecretStr @@ -76,77 +71,56 @@ class FlightSearchState(agentc_langgraph.agent.State): query: str resolved: bool search_results: list[dict] + route_decision: str # Router's classification: "lookup", "book", "view", "reviews" -class FlightSearchAgent(agentc_langgraph.agent.ReActAgent): - """Flight search agent using Agent Catalog tools and ReActAgent framework.""" +# ============================================================================ +# Helper Functions for Parameter Extraction +# ============================================================================ - def __init__(self, catalog: agentc.Catalog, span: agentc.Span, chat_model=None): - """Initialize the flight search agent.""" - if chat_model is None: - # Fallback to OpenAI if no chat model provided - model_name = os.getenv("OPENAI_MODEL", "gpt-4o-mini") - chat_model = langchain_openai.chat_models.ChatOpenAI(model=model_name, temperature=0.1) +def extract_airports(query: str) -> dict: + """Extract source and destination airports from query using regex. Fails fast if not found.""" + import re - super().__init__( - chat_model=chat_model, catalog=catalog, span=span, prompt_name="flight_search_assistant" - ) + # ReAct-style logging for extraction + logger.info("Thought: I need to extract airport codes from the query using regex pattern matching") + logger.info("Action: extract_airports (regex pattern: \\b([A-Z]{3})\\b)") + logger.info(f"Action Input: {query}") - def _invoke( - self, - span: agentc.Span, - state: FlightSearchState, - config: langchain_core.runnables.RunnableConfig, - ) -> FlightSearchState: - """Handle flight search conversation using ReActAgent.""" - - # Initialize conversation if this is the first message - if not state["messages"]: - initial_msg = langchain_core.messages.HumanMessage(content=state["query"]) - state["messages"].append(initial_msg) - logger.info(f"Flight Query: {state['query']}") - - # Get prompt resource first - we'll need it for the ReAct agent - prompt_resource = self.catalog.find("prompt", name="flight_search_assistant") - - # Get tools from Agent Catalog with simplified discovery - tools = [] - tool_names = [ - "lookup_flight_info", - "save_flight_booking", - "retrieve_flight_bookings", - "search_airline_reviews", - ] + # Extract 3-letter airport codes (e.g., "JFK to LAX", "from JFK to LAX", "Find flights JFK LAX") + airport_pattern = r'\b([A-Z]{3})\b' + airports = re.findall(airport_pattern, query.upper()) - for tool_name in tool_names: - try: - # Find tool using Agent Catalog - catalog_tool = self.catalog.find("tool", name=tool_name) - if catalog_tool: - logger.info(f"✅ Found tool: {tool_name}") - else: - logger.error(f"❌ Tool not found: {tool_name}") - continue + if len(airports) >= 2: + result = {"source": airports[0], "dest": airports[1]} + logger.info(f"Observation: Successfully extracted - source_airport: {result['source']}, destination_airport: {result['dest']}") + return result + + # Fail fast - no fallbacks + logger.error(f"Observation: Failed to extract airport codes from query") + raise ValueError( + f"Could not extract airport codes from query: '{query}'. " + f"Please provide clear 3-letter airport codes (e.g., 'JFK to LAX' or 'Find flights from JFK to LAX')" + ) + + +def extract_booking_details(query: str) -> str: + """Extract booking details from natural language and format for tool.""" + # The save_flight_booking tool already handles natural language well + # Just pass the query as-is, it will extract what it needs + return query - except Exception as e: - logger.error(f"❌ Failed to find tool {tool_name}: {e}") - continue - # Create wrapper function to handle proper parameter parsing - def create_tool_wrapper(original_tool, name): - """Create a wrapper for Agent Catalog tools with robust input handling.""" +# ============================================================================ +# Router Node - Intent Classification +# ============================================================================ - def wrapper_func(tool_input: str) -> str: - """Wrapper function that handles input parsing and error handling.""" - try: - logger.info(f"🔧 Tool {name} called with raw input: {repr(tool_input)}") - # Enhanced input sanitization to handle ReAct format artifacts and duplications - if isinstance(tool_input, str): - # Remove ReAct format artifacts that get mixed into input - clean_input = tool_input.strip() +def create_router_node(llm, catalog: agentc.Catalog): + """Create a router node function using Agent Catalog prompt.""" +<<<<<<< Updated upstream # Remove common ReAct artifacts artifacts_to_remove = [ '\nObservation', 'Observation', '\nThought:', 'Thought:', @@ -157,20 +131,25 @@ def wrapper_func(tool_input: str) -> str: for artifact in artifacts_to_remove: if artifact in clean_input: clean_input = clean_input.split(artifact)[0] +======= + def router_node(state: FlightSearchState) -> FlightSearchState: + """Classify user intent and set routing decision.""" - # Clean up quotes and whitespace - clean_input = clean_input.strip().strip("\"'").strip() + # ReAct-style logging: Router classification + logger.info("Thought: I need to classify this query to route it to the correct specialized handler") + logger.info("Action: router_classifier") + logger.info(f"Action Input: {state['query']}") +>>>>>>> Stashed changes - # Fix common duplication patterns (e.g., "JFK,LAX LAX" -> "JFK,LAX") - words = clean_input.split() - if len(words) > 1: - # Remove duplicate consecutive words - cleaned_words = [words[0]] - for word in words[1:]: - if word != cleaned_words[-1]: - cleaned_words.append(word) - clean_input = " ".join(cleaned_words) + # Load classification prompt from Agent Catalog + prompt_resource = catalog.find("prompt", name="router_classifier") + classification_prompt = prompt_resource.content + # Invoke LLM for classification (state is a dict in LangGraph) + response = llm.invoke(classification_prompt.format(query=state["query"])) + decision = response.content.strip().lower() + +<<<<<<< Updated upstream # For airport code patterns, fix duplications like "JFK,LAX LAX" if "," in clean_input and len(clean_input.split()) > 1: parts = clean_input.split(",") @@ -274,9 +253,18 @@ def wrapper_func(tool_input: str) -> str: name=tool_name, description=tool_descriptions.get(tool_name, f"Tool for {tool_name.replace('_', ' ')}"), func=create_tool_wrapper(catalog_tool, tool_name), +======= + # Validate decision - fail fast if invalid + valid_categories = ["lookup", "book", "view", "reviews"] + if decision not in valid_categories: + raise ValueError( + f"Router returned invalid classification: '{decision}'. " + f"Expected one of: {valid_categories}. " + f"Query was: '{state['query']}'" +>>>>>>> Stashed changes ) - tools.append(langchain_tool) +<<<<<<< Updated upstream # Use the Agent Catalog prompt content directly - get first result if it's a list if isinstance(prompt_resource, list): prompt_resource = prompt_resource[0] @@ -339,9 +327,165 @@ def handle_parsing_errors(error): assistant_msg = langchain_core.messages.AIMessage(content=response["output"]) state["messages"].append(assistant_msg) state["resolved"] = True +======= + state["route_decision"] = decision + logger.info(f"Observation: Classified as '{decision}' (routing to {decision}_flights/bookings/reviews node)") +>>>>>>> Stashed changes + + return state + + return router_node + + +# ============================================================================ +# Specialized Node Functions - Direct Tool Calls +# ============================================================================ + + +def create_lookup_flights_node(catalog: agentc.Catalog): + """Create a node for looking up available flights.""" + + def lookup_flights_node(state: FlightSearchState) -> FlightSearchState: + """Handle flight lookup queries with direct tool invocation. Fails fast on errors.""" + logger.info(f"✈️ Lookup node processing: {state['query']}") + + # Extract airports from query (fails fast if not found) + # This will log its own Thought/Action/Observation + airports = extract_airports(state["query"]) + + # ReAct-style logging: Tool invocation + logger.info("Thought: Now I'll search for available flights between these airports") + logger.info("Action: lookup_flight_info") + logger.info(f"Action Input: source_airport={airports['source']}, destination_airport={airports['dest']}") + + # Get tool and call directly via Agent Catalog + tool = catalog.find("tool", name="lookup_flight_info") + response = tool.func( + source_airport=airports["source"], + destination_airport=airports["dest"] + ) + + # Show full observation + logger.info(f"Observation: {response}") + + # Update state (state is a dict in LangGraph) + state["messages"].append(langchain_core.messages.AIMessage(content=response)) + state["search_results"] = [response] + state["resolved"] = True + logger.info("✅ Lookup node completed successfully") return state + return lookup_flights_node + + +def create_book_flight_node(catalog: agentc.Catalog): + """Create a node for booking flights.""" + + def book_flight_node(state: FlightSearchState) -> FlightSearchState: + """Handle flight booking queries with direct tool invocation. Fails fast on errors.""" + logger.info(f"📝 Book node processing: {state['query']}") + + # ReAct-style logging: Extraction + logger.info("Thought: I need to extract booking details from the query") + logger.info("Action: extract_booking_details") + logger.info(f"Action Input: {state['query']}") + + # Extract booking details + booking_input = extract_booking_details(state["query"]) + logger.info(f"Observation: Extracted booking details: {booking_input}") + + # ReAct-style logging: Tool invocation + logger.info("Thought: Now I'll create the flight booking with these details") + logger.info("Action: save_flight_booking") + logger.info(f"Action Input: {booking_input}") + + # Get tool and call directly via Agent Catalog + tool = catalog.find("tool", name="save_flight_booking") + response = tool.func(booking_input=booking_input) + + # Show full observation + logger.info(f"Observation: {response}") + + # Update state (state is a dict in LangGraph) + state["messages"].append(langchain_core.messages.AIMessage(content=response)) + state["search_results"] = [response] + state["resolved"] = True + logger.info("✅ Book node completed successfully") + + return state + + return book_flight_node + + +def create_view_bookings_node(catalog: agentc.Catalog): + """Create a node for viewing existing bookings.""" + + def view_bookings_node(state: FlightSearchState) -> FlightSearchState: + """Handle view bookings queries with direct tool invocation. Fails fast on errors.""" + logger.info(f"👀 View node processing: {state['query']}") + + # ReAct-style logging: Tool invocation + logger.info("Thought: I'll retrieve all current flight bookings for the user") + logger.info("Action: retrieve_flight_bookings") + logger.info("Action Input: booking_query='' (empty string to get all bookings)") + + # Get tool and call with empty query to get all bookings via Agent Catalog + tool = catalog.find("tool", name="retrieve_flight_bookings") + response = tool.func(booking_query="") + + # Show full observation + logger.info(f"Observation: {response}") + + # Update state (state is a dict in LangGraph) + state["messages"].append(langchain_core.messages.AIMessage(content=response)) + state["search_results"] = [response] + state["resolved"] = True + logger.info("✅ View node completed successfully") + + return state + + return view_bookings_node + + +def create_search_reviews_node(catalog: agentc.Catalog): + """Create a node for searching airline reviews.""" + + def search_reviews_node(state: FlightSearchState) -> FlightSearchState: + """Handle airline review search queries with direct tool invocation. Fails fast on errors.""" + logger.info(f"⭐ Reviews node processing: {state['query']}") + + # Use the query as-is for searching reviews + # The tool expects natural language like "SpiceJet service quality" + search_query = state["query"] + + # ReAct-style logging: Tool invocation + logger.info("Thought: I'll search for airline reviews using vector similarity search") + logger.info("Action: search_airline_reviews") + logger.info(f"Action Input: query='{search_query}'") + + # Get tool and call directly via Agent Catalog + tool = catalog.find("tool", name="search_airline_reviews") + response = tool.func(query=search_query) + + # Show full observation + logger.info(f"Observation: {response}") + + # Update state (state is a dict in LangGraph) + state["messages"].append(langchain_core.messages.AIMessage(content=response)) + state["search_results"] = [response] + state["resolved"] = True + logger.info("✅ Reviews node completed successfully") + + return state + + return search_reviews_node + + +# ============================================================================ +# FlightSearchGraph - Router-Based Architecture +# ============================================================================ + class FlightSearchGraph(agentc_langgraph.graph.GraphRunnable): """Flight search conversation graph using Agent Catalog.""" @@ -359,35 +503,56 @@ def build_starting_state(query: str) -> FlightSearchState: query=query, resolved=False, search_results=[], + route_decision="", # Will be set by router ) def compile(self): - """Compile the LangGraph workflow.""" + """Compile the LangGraph workflow with router-based architecture.""" - # Build the flight search agent with catalog integration - search_agent = FlightSearchAgent( - catalog=self.catalog, span=self.span, chat_model=self.chat_model - ) + # Create specialized node functions using Agent Catalog + router = create_router_node(self.chat_model, self.catalog) + lookup_node = create_lookup_flights_node(self.catalog) + book_node = create_book_flight_node(self.catalog) + view_node = create_view_bookings_node(self.catalog) + reviews_node = create_search_reviews_node(self.catalog) - # Create a wrapper function for the ReActAgent - def flight_search_node(state: FlightSearchState) -> FlightSearchState: - """Wrapper function for the flight search ReActAgent.""" - return search_agent._invoke( - span=self.span, - state=state, - config={}, # Empty config for now - ) + # Define routing logic based on classification + def route_query(state: FlightSearchState) -> str: + """Route to appropriate node based on classification.""" + return state["route_decision"] - # Create a simple workflow graph for flight search + # Build the graph workflow = langgraph.graph.StateGraph(FlightSearchState) - # Add the flight search agent node using the wrapper function - workflow.add_node("flight_search", flight_search_node) + # Add all nodes + workflow.add_node("router", router) + workflow.add_node("lookup_flights", lookup_node) + workflow.add_node("book_flight", book_node) + workflow.add_node("view_bookings", view_node) + workflow.add_node("search_reviews", reviews_node) + + # Set entry point to router + workflow.set_entry_point("router") + + # Add conditional edges from router to specialized nodes + workflow.add_conditional_edges( + "router", + route_query, + { + "lookup": "lookup_flights", + "book": "book_flight", + "view": "view_bookings", + "reviews": "search_reviews", + }, + ) - # Set entry point and simple flow - workflow.set_entry_point("flight_search") - workflow.add_edge("flight_search", langgraph.graph.END) + # All specialized nodes end after execution + workflow.add_edge("lookup_flights", langgraph.graph.END) + workflow.add_edge("book_flight", langgraph.graph.END) + workflow.add_edge("view_bookings", langgraph.graph.END) + workflow.add_edge("search_reviews", langgraph.graph.END) + logger.info("✅ Router-based graph compiled successfully") return workflow.compile() diff --git a/notebooks/flight_search_agent_langraph/prompts/flight_search_assistant.yaml b/notebooks/flight_search_agent_langraph/prompts/flight_search_assistant.yaml index 384bf92..e01239c 100644 --- a/notebooks/flight_search_agent_langraph/prompts/flight_search_assistant.yaml +++ b/notebooks/flight_search_agent_langraph/prompts/flight_search_assistant.yaml @@ -8,8 +8,9 @@ name: flight_search_assistant # A description of where this prompt is used. # This field is mandatory, and will be used (indirectly) when performing semantic search for prompts. description: > - Professional flight search assistant with comprehensive tools for flight operations, booking management, and airline reviews. - Designed for Agent Catalog + Couchbase tutorial with clear task completion and error recovery. + Professional flight search assistant using router-based architecture with LangGraph. + Routes queries to specialized nodes for flight lookup, booking, viewing bookings, and searching airline reviews. + Designed for Agent Catalog + Couchbase tutorial with deterministic routing and clean tool invocation. # As a supplement to the description similarity search, users can optionally specify search annotations. # The values of these annotations MUST be strings (e.g., not 'true', but '"true"'). @@ -18,34 +19,44 @@ annotations: framework: "langgraph" database: "couchbase" tutorial: "flight-search-agent" + architecture: "router" # The tools associated with this prompt. tools: - name: "lookup_flight_info" - name: "save_flight_booking" - - name: "retrieve_flight_bookings" + - name: "retrieve_flight_bookings" - name: "search_airline_reviews" -# The main content of the prompt - simplified and focused +# The main content of the prompt - simplified for router architecture +# Note: This prompt is primarily for documentation. The router uses inline classification prompts. content: > - You are a flight search assistant. Help users find flights, book flights, view bookings, and search airline reviews. + You are a flight search assistant using router-based architecture. - You have access to the following tools: - {tools} + Query Classification: + - "lookup" → Find available flights + - "book" → Create new flight bookings + - "view" → Retrieve existing bookings + - "reviews" → Search airline passenger reviews +<<<<<<< Updated upstream CRITICAL FORMAT RULES: 1. Always start with "Thought:" before any Action 2. Use ONE tool per Action 3. NEVER write Final Answer in the same response as Action 4. Wait for Observation after each Action 5. Use simple parameters without quotes +======= + Each query is routed to a specialized node that handles that specific task using the appropriate tool. +>>>>>>> Stashed changes - TOOL INPUT EXAMPLES: - - lookup_flight_info → Action Input: JFK,LAX - - save_flight_booking → Action Input: LAX to JFK, tomorrow, 2 passengers, business class - - retrieve_flight_bookings → Action Input: - - search_airline_reviews → Action Input: SpiceJet service quality + Available Tools: + - lookup_flight_info: Find flights between airports + - save_flight_booking: Create new bookings + - retrieve_flight_bookings: View existing bookings + - search_airline_reviews: Search passenger reviews +<<<<<<< Updated upstream For retrieve_flight_bookings: Leave Action Input completely blank to get all bookings Use this exact format: @@ -62,4 +73,7 @@ content: > Today: {current_date} Question: {input} - Thought:{agent_scratchpad} \ No newline at end of file + Thought:{agent_scratchpad} +======= + Architecture: Router → Specialized Node → Direct Tool Call → Response +>>>>>>> Stashed changes diff --git a/notebooks/flight_search_agent_langraph/prompts/router_classifier.yaml b/notebooks/flight_search_agent_langraph/prompts/router_classifier.yaml new file mode 100644 index 0000000..7ec0a89 --- /dev/null +++ b/notebooks/flight_search_agent_langraph/prompts/router_classifier.yaml @@ -0,0 +1,44 @@ +# To signal to Agent Catalog that this file is a prompt, the 'record_kind' field must be set to 'prompt'. +record_kind: prompt + +# The name of the prompt must be a valid Python identifier (e.g., no spaces). +name: router_classifier + +# Description for semantic search +description: > + Intent classification prompt for routing flight search queries to specialized nodes. + Classifies queries into four categories: lookup (search flights), book (create booking), + view (retrieve bookings), or reviews (search airline reviews). + +# Annotations for filtering +annotations: + framework: "langgraph" + purpose: "router" + tutorial: "flight-search-agent" + +# No tools needed for classification +tools: [] + +# Classification prompt content +content: > + You are a flight query classifier. Analyze the user's query and classify it into EXACTLY ONE category. + + Categories: + - "lookup" : User wants to find/search for available flights + - "book" : User wants to create a new flight booking + - "view" : User wants to see their existing bookings + - "reviews": User wants to search airline reviews or passenger feedback + + Examples: + "Find flights from JFK to LAX" → lookup + "Show me flights to Miami" → lookup + "Book a flight from LAX to JFK tomorrow, 2 passengers, business class" → book + "Create a booking for next week" → book + "Show me my current bookings" → view + "What are my flights?" → view + "What do passengers say about SpiceJet?" → reviews + "How is the service on IndiGo?" → reviews + + User Query: {query} + + Respond with ONLY ONE WORD: lookup, book, view, or reviews