创建使用MCP服务器的AI Agent(第4部分)
现在到了神奇的部分!在第3部分中,我们构建了一个可以收集和分析反馈的MCP服务器。今天,我们将创建一个使用这个服务器的AI Agent来智能处理客户交互。到最后,您将拥有一个可工作的AI系统!
我们要构建什么
我们将创建一个AI Agent,它:
连接到我们的MCP服务器
使用LLM理解客户需求
自动收集和分析反馈
生成智能报告
建议业务改进
第1步:设置LLM集成
首先,让我们安装所需的内容:
# 确保您的虚拟环境已激活
pip install openai python-dotenv aiohttp
bash
创建一个.env文件来存储您的API密钥:
# .env文件
OPENAI_API_KEY=your-api-key-here
# 从以下地址获取您的密钥:https://platform.openai.com/api-keys
第2步:构建我们的AI Agent
创建feedback_agent.py
#!/usr/bin/env python3
"""
AI Agent for Customer Feedback System
Intelligently handles feedback collection and analysis
"""

import os
import json
import asyncio
from typing import Dict, Any, List
from datetime import datetime
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# For LLM integration
from openai import AsyncOpenAI

# For MCP client
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

class FeedbackAgent:
def __init__(self):
self.name = "Feedback Assistant"
self.llm = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.mcp_session = None
self.server_params = StdioServerParameters(
command="python",
args=["feedback_server.py"]
)

async def connect_to_server(self):
"""Connect to our MCP server"""
print(f"🔌 Connecting to feedback server...")

async with stdio_client(self.server_params) as (read, write):
async with ClientSession(read, write) as session:
self.mcp_session = session

# Initialize connection
await session.initialize()

# Get available tools and resources
tools = await session.list_tools()
resources = await session.list_resources()

print(f"✅ Connected! Found {len(tools)} tools and {len(resources)} resources")

# Keep the session active
await self.run_agent_loop()

async def think(self, task: str) -> str:
"""Use LLM to understand and plan actions"""
response = await self.llm.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": f"""You are {self.name}, an AI assistant for a café.
You help collect and analyze customer feedback.
Be friendly, professional, and insightful."""
},
{
"role": "user",
"content": task
}
],
temperature=0.7
)

return response.choices[0].message.content

async def collect_customer_feedback(self, conversation: str) -> Dict[str, Any]:
"""Intelligently extract feedback from conversation"""

# Use LLM to extract information
extraction_prompt = f"""
Extract the following from this customer conversation:
1. Customer name (if mentioned)
2. The main feedback points
3. Overall rating (1-5)
4. Key topics mentioned

Conversation:
{conversation}

Return as JSON format.
"""

response = await self.llm.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "Extract information and return valid JSON only."},
{"role": "user", "content": extraction_prompt}
],
temperature=0.3,
response_format={"type": "json_object"}
)

extracted = json.loads(response.choices[0].message.content)

# Use MCP tool to collect feedback
result = await self.mcp_session.call_tool(
"collect_feedback",
arguments={
"customer_name": extracted.get("customer_name", "Anonymous"),
"feedback": extracted.get("feedback", conversation),
"rating": extracted.get("rating", 3)
}
)

return {
"status": "collected",
"details": extracted,
"server_response": result
}

async def analyze_feedback_trends(self) -> str:
"""Analyze all feedback and generate insights"""

# Get recent feedback from MCP server
recent_feedback = await self.mcp_session.read_resource("feedback://recent")
summary_data = await self.mcp_session.read_resource("feedback://summary")

# Use LLM to generate insights
analysis_prompt = f"""
Analyze this customer feedback data and provide:
1. Key themes and patterns
2. Areas needing immediate attention
3. Positive aspects to maintain
4. Specific recommendations for improvement

Recent Feedback:
{recent_feedback}

Summary Statistics:
{summary_data}
"""

response = await self.llm.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "You are a business analyst. Provide actionable insights."
},
{"role": "user", "content": analysis_prompt}
],
temperature=0.5
)

return response.choices[0].message.content

async def handle_customer_interaction(self, message: str) -> str:
"""Main interaction handler"""

# Determine intent
intent_prompt = f"""
Classify this message intent:
- 'give_feedback': Customer wants to share feedback
- 'check_status': Customer asking about their previous feedback
- 'general_question': Other questions

Message: {message}

Return only the intent classification.
"""

intent_response = await self.llm.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "Classify intent. Return only the classification."},
{"role": "user", "content": intent_prompt}
],
temperature=0.1
)

intent = intent_response.choices[0].message.content.strip().lower()

if "give_feedback" in intent:
# Collect feedback
feedback_result = await self.collect_customer_feedback(message)

# Generate friendly response
response = await self.think(
f"Customer gave feedback. Details: {feedback_result}. "
"Thank them and mention any immediate actions we'll take."
)

return response

elif "check_status" in intent:
# Get feedback summary
summary = await self.mcp_session.read_resource("feedback://summary")

response = await self.think(
f"Customer asking about feedback status. Our summary: {summary}. "
"Provide a helpful update."
)

return response

else:
# General response
return await self.think(f"Respond helpfully to: {message}")

async def run_agent_loop(self):
"""Main agent interaction loop"""
print(f"\n🤖 {self.name} is ready! Type 'quit' to exit.")
print("Try: 'Hi, I'm Sarah. I loved the new latte recipe! 5 stars!'")

while True:
try:
# Get user input
user_input = input("\n👤 You: ").strip()

if user_input.lower() in ['quit', 'exit', 'bye']:
print(f"👋 {self.name}: Goodbye! Have a great day!")
break

if user_input.lower() == 'analyze':
# Run analysis
print(f"\n📊 {self.name}: Analyzing feedback trends...")
analysis = await self.analyze_feedback_trends()
print(f"📈 Analysis:\n{analysis}")
continue

# Handle regular interaction
print(f"\n🤖 {self.name}: Processing...")
response = await self.handle_customer_interaction(user_input)
print(f"🤖 {self.name}: {response}")

except KeyboardInterrupt:
print(f"\n👋 {self.name}: Goodbye!")
break
except Exception as e:
print(f"❌ Error: {e}")

# Main entry point
async def main():
agent = FeedbackAgent()
await agent.connect_to_server()

if __name__ == "__main__":
asyncio.run(main())
python
第3步:添加工作流引擎
创建workflow_engine.py来管理自动化任务:
#!/usr/bin/env python3
"""
Workflow Engine for Feedback System
Manages automated tasks and scheduled operations
"""

import asyncio
from typing import Dict, Any
from datetime import datetime, timedelta

class FeedbackWorkflow:
def __init__(self, agent):
self.agent = agent
self.workflows = {
"daily_analysis": self.daily_analysis_workflow,
"negative_feedback_alert": self.negative_feedback_alert,
"weekly_report": self.weekly_report_workflow
}

async def daily_analysis_workflow(self):
"""Run daily feedback analysis"""
print(f"\n📊 Running daily analysis...")

# Get current feedback summary
summary = await self.agent.mcp_session.read_resource("feedback://summary")
# Analyze trends
analysis = await self.agent.analyze_feedback_trends()
print(f"📈 Daily Analysis Complete:\n{analysis}")
# Check for urgent issues
if "negative" in summary.lower() and "50" in summary:
print("⚠️ High negative feedback detected!")
await self.negative_feedback_alert()

async def negative_feedback_alert(self, feedback_data: Dict[str, Any] = None):
"""Handle negative feedback alerts"""
print(f"\n🚨 Negative feedback alert!")

if feedback_data:
# Create action plan
response_plan = await self.agent.think(
f"Create action plan for this negative feedback: {feedback_data['feedback']}. "
"Include: immediate response, follow-up actions, and prevention measures."
)

print(f"📋 Action Plan:\n{response_plan}")

# In production:
# - Send alert to manager
# - Create follow-up task
# - Track resolution

return response_plan

async def weekly_report_workflow(self):
"""Generate comprehensive weekly report"""
print(f"\n📊 Generating weekly report...")

# Get all data
summary = await self.agent.mcp_session.read_resource("feedback://summary")
recent = await self.agent.mcp_session.read_resource("feedback://recent")

# Generate executive summary
report = await self.agent.think(
f"Create executive summary for this week's feedback. "
f"Data: {summary}\nRecent examples: {recent}\n"
"Include: key metrics, trends, recommendations, and success stories."
)

print(f"📄 Weekly Report:\n{report}")
return report

async def run_workflow(self, workflow_name: str, *args, **kwargs):
"""Execute a specific workflow"""
if workflow_name in self.workflows:
return await self.workflows[workflow_name](*args, **kwargs)
else:
raise ValueError(f"Unknown workflow: {workflow_name}")

class WorkflowScheduler:
def __init__(self, workflow_engine):
self.engine = workflow_engine
self.scheduled_tasks = []

def schedule_daily(self, hour: int, minute: int, workflow: str):
"""Schedule daily workflow"""
self.scheduled_tasks.append({
"type": "daily",
"time": {"hour": hour, "minute": minute},
"workflow": workflow
})

def schedule_on_event(self, event_type: str, workflow: str):
"""Schedule event-triggered workflow"""
self.scheduled_tasks.append({
"type": "event",
"trigger": event_type,
"workflow": workflow
})

async def run(self):
"""Run the scheduler"""
print("⏰ Workflow scheduler started")

# For demo: simulate some workflows
await asyncio.sleep(2)
await self.engine.run_workflow("daily_analysis")

# Simulate negative feedback
await asyncio.sleep(3)
await self.engine.run_workflow("negative_feedback_alert", {
"customer_name": "Demo User",
"feedback": "The coffee was cold and service was slow",
"rating": 2
})
python
第4步:整合所有组件
创建run_system.py来运行完整系统:
#!/usr/bin/env python3
"""
Run the Complete AI Feedback System
"""

import asyncio
import subprocess
import time
from feedback_agent import FeedbackAgent
from workflow_engine import FeedbackWorkflow, WorkflowScheduler

async def run_complete_system():
"""Run server and agent together"""

# Start the MCP server in background
print("🚀 Starting MCP Feedback Server...")
server_process = subprocess.Popen(
["python", "feedback_server.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)

# Give server time to start
time.sleep(2)

try:
# Create and connect agent
print("🤖 Starting AI Agent...")
agent = FeedbackAgent()

# Create workflow engine
workflow = FeedbackWorkflow(agent)
scheduler = WorkflowScheduler(workflow)

# Schedule some workflows
scheduler.schedule_daily(9, 0, "daily_analysis")
scheduler.schedule_on_event("negative_feedback", "negative_feedback_alert")
scheduler.schedule_daily(17, 0, "weekly_report")

# Run the agent
await agent.connect_to_server()

finally:
# Clean up
server_process.terminate()
print("\n👋 System shutdown complete")

if __name__ == "__main__":
print("🎯 Complete AI Feedback System Starting...\n")
asyncio.run(run_complete_system())
python
第5步:测试您的AI系统
让我们测试完整系统:
# 确保您的.env文件包含您的OpenAI API密钥
python run_system.py
bash
尝试这些交互:
提供反馈:
"Hi, I'm Sarah. I loved the new latte recipe! 5 stars!"
"The service was terrible today. My order took 30 minutes."
检查分析:
输入:analyze
提问:
"What's the overall feedback trend?"
"How many people gave feedback today?"
我们实现的关键概念
1. Agent-服务器通信
Agent连接到MCP服务器并可以:
读取资源(反馈数据)
调用工具(收集反馈、分析情感)
智能处理结果
2. LLM集成
我们使用GPT-3.5来:
理解自然语言
从对话中提取信息
生成洞察和建议
创建个性化响应
3. 自动化工作流
我们的系统可以:
运行定时分析
响应事件(负面反馈)
自动生成报告
4. 智能决策制定
Agent:
分类用户意图
选择适当的行动
提供上下文响应
下一步是什么?
在我们的最后一部分(第5部分)中,我们将:
使用真实数据库添加数据持久性
实现高级功能(多语言支持、语音输入)
为生产使用部署系统
探索扩展策略
您现在已经构建了一个功能性的AI系统!您将如何扩展它以满足您的业务需求?哪些其他工作流会有帮助?
挑战:扩展您的系统
尝试添加这些功能:
负面反馈的电子邮件通知
情感趋势可视化
客户响应模板
多位置支持
在评论中分享您的扩展!
技术要点总结
AI Agent的核心功能
智能交互:使用LLM理解用户意图
自动反馈收集:从对话中提取结构化信息
趋势分析:生成业务洞察和建议
工作流自动化:定时任务和事件触发
系统架构特点
模块化设计:Agent、服务器、工作流引擎分离
异步处理:支持并发操作和实时响应
智能决策:基于LLM的意图识别和响应生成
可扩展性:易于添加新功能和工作流
实际应用价值
客户服务自动化:24/7智能客服支持
反馈管理:自动收集、分析和响应客户反馈
业务洞察:实时趋势分析和改进建议
工作流优化:自动化重复任务和报告生成
开发最佳实践
环境配置:使用.env文件管理API密钥
错误处理:完善的异常处理机制
日志记录:详细的系统运行日志
测试验证:完整的系统集成测试
通过这个教程,您已经掌握了构建完整AI系统的核心技能,可以开始创建自己的智能应用了!
Aa