MCP从理论到实战Python版:让AI真正”能干”
目录
前言:为什么学习MCP第1章 初识MCP:从”能说”到”能做”
1.1 大模型的困境:为什么需要MCP1.2 MCP是什么:AI的”万能插座”1.3 MCP如何工作:理解”餐厅服务流程”1.4 MCP能做什么:10个实际应用场景1.5 小结:开启MCP之旅 第2章 搭建MCP环境:5分钟开始实践
2.1 安装Python和基础工具2.2 安装MCP SDK2.3 在Cursor中配置MCP2.4 测试第一个MCP服务器2.5 小结:环境搭建完成 第3章 创建第一个MCP服务器:Hello World
3.1 理解MCP服务器的三大能力3.2 创建资源服务器:提供数据3.3 创建工具服务器:执行操作3.4 创建提示词服务器:引导对话3.5 小结:掌握三大核心功能 第4章 实战项目:构建实用MCP服务器
4.1 项目1:天气查询服务器4.2 项目2:网页抓取服务器4.3 项目3:数据库查询服务器4.4 项目4:Git操作服务器4.5 小结:从练习到实战 第5章 高级技巧:让MCP服务器更强大
5.1 错误处理和日志记录5.2 安全性和认证5.3 性能优化5.4 测试和调试5.5 部署和分发5.6 小结:最佳实践
前言:为什么学习MCP
欢迎阅读《MCP从理论到实战Python版》!
想象一下,你有一个超级聪明的助手(AI),它能理解你的问题、写文章、写代码。但是有一个问题:它只能”说”,不能”做”。
就像一个博学的教授,能回答你所有问题,但:
❌ 不能帮你查天气(需要联网获取实时数据)❌ 不能帮你管理文件(需要访问你的文件系统)❌ 不能帮你发邮件(需要调用邮件服务)❌ 不能帮你查数据库(需要连接数据库)
这就是大模型的局限——它很聪明,但缺少”手脚”。
MCP(Model Context Protocol)就是给AI装上”手脚”的技术。
什么是MCP?
MCP是一个标准协议,让AI能够:
📖 读取资源:访问文件、数据库、网页等外部数据🔧 使用工具:执行操作,如发送邮件、调用API、运行代码💡 获取提示:使用预定义的对话模板,提高交互效率
就像给智能手机安装APP一样,MCP让AI从”只会聊天”变成”能帮你干活”。
为什么选择本书?
✅ 通俗易懂
用生活化的比喻解释技术概念(餐厅服务、万能遥控器等)零基础也能看懂,有编程经验更轻松
✅ 实例丰富
每个概念都有完整的代码示例所有代码都经过测试,可以直接运行从Hello World到实战项目,循序渐进
✅ 即学即用
第2章就能运行你的第一个MCP服务器第4章开始构建实用项目学完立即能应用到工作中
✅ 贴近实战
4个完整的实战项目涵盖错误处理、安全性、性能优化等最佳实践解决真实开发中的常见问题
适合读者
编程初学者:想学习如何让AI”能干活”,本书用通俗语言讲解所有概念Python开发者:想快速掌握MCP开发,将AI能力集成到项目中AI应用开发者:想了解如何扩展AI的能力,让它能访问外部数据和工具学生和研究者:想学习最新的AI技术,为学术研究做准备
学习路径
第1-2章:基础篇(建议1-2天)
理解MCP的概念和价值搭建开发环境运行第一个MCP服务器
第3章:入门篇(建议2-3天)
掌握MCP的三大核心功能学习资源、工具、提示词的使用完成3个基础示例
第4章:实战篇(建议5-7天)
构建4个实用MCP服务器天气查询、文件管理、数据库、网页抓取学习完整的开发流程
第5章:进阶篇(建议3-5天)
掌握高级技巧和最佳实践错误处理、安全性、性能优化测试、调试、部署
如何使用本书
本书采用”理论+实践”的学习方式:
阅读概念:每章开头用生活化的例子解释概念运行代码:每个概念都有完整的可执行代码动手实践:跟着步骤一步步创建自己的MCP服务器构建项目:参考实战项目,结合自己的需求开发应用
准备工作
开始学习前,请确保:
✅ 有一台电脑(Mac、Windows或Linux都可以)✅ 安装了Python 3.8或更高版本✅ 安装了代码编辑器(推荐使用Cursor)✅ 有一颗好奇心和耐心!
致谢
感谢Anthropic团队开发了MCP协议,让AI应用开发变得更简单。
开始你的MCP之旅
现在,让我们一起开始这段精彩的MCP开发之旅吧!记住,学习新技术就像学骑自行车,不要害怕摔倒,多实践、多尝试,你一定能掌握!
第1章 初识MCP:从”能说”到”能做”
1.1 大模型的困境:为什么需要MCP
大模型很聪明,但缺少”手脚”
想象一下,你雇了一个超级聪明的助理。他:
知识渊博,能回答各种问题文笔出众,能写各种文章逻辑清晰,能分析复杂问题
但是,他有一个致命的问题:只能坐在办公室里,不能出去办事。
这就是当今大模型(如GPT-4、Claude、DeepSeek等)的现状。它们很聪明,但:
❌ 不能联网查询实时信息
用户:“今天北京天气怎么样?”AI:“抱歉,我无法获取实时天气信息。”
❌ 不能访问你的文件
用户:“帮我总结一下桌面上的report.pdf”AI:“抱歉,我无法访问你的文件系统。”
❌ 不能调用其他服务
用户:“帮我发一封邮件给老板”AI:“抱歉,我无法发送邮件。”
❌ 不能操作数据库
用户:“查询一下上个月的销售数据”AI:“抱歉,我无法访问你的数据库。”
这就像雇了一个博士生助理,但他只能坐在办公室里回答问题,不能出去帮你办事。
传统解决方案的问题
为了让AI”能干活”,开发者们想了很多办法:
方案1:Function Calling(函数调用)
AI可以”告诉”程序需要调用哪个函数,然后程序执行并返回结果。
# 伪代码示例
def get_weather(city):
return "晴天,25度"
# AI识别需要调用get_weather函数
result = get_weather("北京")
# AI使用返回的结果继续对话
问题:
❌ 每个AI服务有自己的调用方式(GPT用的方式和Claude不一样)❌ 开发者需要为每个AI重写一遍工具集成代码❌ 不同应用之间无法共享工具
方案2:RAG(检索增强生成)
把外部数据转换成文本,直接塞给AI。
# 伪代码示例
# 1. 读取文件内容
doc_content = read_file("report.pdf")
# 2. 把内容塞给AI
prompt = f"这是文件内容:{doc_content}
请总结这个文件"
问题:
❌ 只能读数据,不能执行操作(如发邮件、修改文件)❌ 数据量大时,很贵很慢(每次都要把整个文件内容发给AI)❌ 无法处理实时数据(如天气、股票价格)
方案3:自定义集成
为每个应用单独开发工具集成。
问题:
❌ 开发成本高(每个应用都要重新开发一遍)❌ 维护困难(有100个应用就要维护100套代码)❌ 无法复用(为Cursor开发的工具无法在Claude Desktop使用)
MCP的解决方案:统一接口
MCP就像是”万能遥控器”,解决了所有这些问题:
✅ 统一标准
所有AI应用都用同样的方式调用工具开发一次,到处使用
✅ 能力完整
不仅能读数据(资源),还能执行操作(工具)还能提供对话模板(提示词)
✅ 易于扩展
开发新工具很简单工具之间可以组合使用
✅ 高效经济
只传输必要的数据支持实时数据查询
1.2 MCP是什么:AI的”万能插座”
用生活例子理解MCP
比喻1:万能遥控器
你家里有很多电器:电视、空调、灯、窗帘。每个电器都有自己的遥控器。
❌ 传统方式:每个电器一个遥控器,遥控器一大堆,找起来很麻烦✅ MCP方式:一个万能遥控器控制所有电器,学习一次就会用
比喻2:手机APP商店
智能手机刚出来时,每个功能都要手机厂商自己开发(打电话、发短信、拍照)。
❌ 传统方式:手机厂商开发所有功能,功能有限且更新慢✅ APP商店模式:开发者可以开发各种APP,用户可以自由安装
MCP就是AI的”APP商店”,让开发者可以为AI开发各种”工具”。
MCP的三大核心能力
MCP让AI拥有三种能力:
1. 资源(Resources):读取数据
就像图书馆的书架,AI可以访问各种外部数据:
📖 资源示例:
- 文件内容(本地文件、云端文档)
- 数据库记录(用户数据、订单信息)
- 网页内容(新闻、产品信息)
- API数据(天气、股票价格)
2. 工具(Tools):执行操作
就像工具箱里的工具,AI可以执行各种操作:
🔧 工具示例:
- 发送邮件
- 创建/修改/删除文件
- 运行代码
- 调用API
- 数据库操作
3. 提示词(Prompts):对话模板
就像餐厅的菜单,预定义的对话流程:
💡 提示词示例:
- "帮我写一封商务邮件"
- "分析这个代码的性能问题"
- "总结这篇文章的要点"
MCP的架构:三个角色
MCP系统有三个核心角色,就像餐厅的三个角色:
1. MCP主机(MCP Host):餐厅大堂
就是你使用的AI应用(如Cursor、Claude Desktop)负责接收用户请求,协调所有服务
2. MCP客户端(MCP Client):服务员
理解用户需求决定需要调用哪些工具或资源就像服务员知道哪道菜需要哪个厨师做
3. MCP服务器(MCP Server):厨房窗口
提供具体的工具和资源执行实际操作并返回结果就像厨房的不同档口(凉菜档、热菜档、面点档)
1.3 MCP如何工作:理解”餐厅服务流程”
让我们用一个完整的例子来理解MCP的工作流程。
场景:用户要查询天气
1. 用户发起请求(点菜)
用户在Cursor中输入:"北京今天天气怎么样?"
2. MCP主机接收请求(大堂经理接单)
Cursor(MCP主机)接收到用户的问题
3. MCP客户端理解需求(服务员理解)
MCP客户端分析:
- 用户想要查询天气
- 需要调用天气服务器的get_weather工具
- 参数是:city="北京"
4. 调用MCP服务器(通知厨房)
MCP客户端调用天气服务器:
server.call_tool(
name="get_weather",
arguments={"city": "北京"}
)
5. MCP服务器执行操作(厨房做菜)
# 天气服务器的代码
def get_weather(city):
# 调用和风天气API
response = requests.get(f"https://api.qweather.com/v7/weather/now?location={city}")
return response.json()
6. 返回结果(上菜)
MCP服务器返回:
{
"temperature": "25°C",
"weather": "晴天",
"humidity": "60%"
}
7. MCP客户端处理结果(服务员确认)
MCP客户端把数据返回给AI
8. AI生成回复(顾客品尝)
AI使用返回的数据生成友好的回复:
"北京今天是晴天,气温25°C,湿度60%。天气不错,适合外出活动!"
9. 显示给用户(顾客满意)
用户在Cursor中看到AI的回复
完整流程图
graph TD
A["用户<br/>北京今天天气怎么样?"] --> B["MCP主机 Cursor<br/>接收用户请求"]
B --> C["MCP客户端<br/>分析: 需要调用天气工具<br/>参数: city=北京"]
C -->|call_tool get_weather| D["天气MCP服务器<br/>调用和风天气API<br/>获取实时天气数据"]
D -->|返回: temperature: 25°C| E["MCP客户端<br/>接收数据"]
E --> F["AI Claude<br/>生成友好回复"]
F --> G["MCP主机<br/>显示回复"]
G --> H["用户<br/>北京今天是晴天,气温25°C..."]
style A fill:#e1f5ff
style H fill:#e1f5ff
style D fill:#fff3e0
style F fill:#f3e5f5
1.4 MCP能做什么:10个实际应用场景
场景1:文件管理助手
用户:“帮我整理桌面上的文件,把图片放到Pictures文件夹”
MCP服务器提供:
资源:读取桌面文件列表工具:移动文件、创建文件夹
场景2:数据分析助手
用户:“分析一下上个月的销售数据,找出销量前10的产品”
MCP服务器提供:
资源:连接数据库,读取销售数据工具:执行SQL查询,生成图表
场景3:代码审查助手
用户:“检查这个项目的代码质量,找出潜在问题”
MCP服务器提供:
资源:读取项目文件工具:运行代码检查工具(如pylint、eslint)
场景4:邮件管理助手
用户:“给所有客户发送节日问候邮件”
MCP服务器提供:
资源:读取客户列表工具:发送邮件
场景5:网页监控助手
用户:“每天早上告诉我Python官网有没有发布新版本”
MCP服务器提供:
资源:抓取网页内容工具:定时检查,发送通知
场景6:文档生成助手
用户:“根据代码自动生成API文档”
MCP服务器提供:
资源:读取代码文件工具:解析代码,生成Markdown文档
场景7:智能搜索助手
用户:“在我的笔记中搜索关于’机器学习’的内容”
MCP服务器提供:
资源:读取所有笔记工具:全文搜索,相似度排序
场景8:自动化测试助手
用户:“运行所有单元测试,告诉我哪些失败了”
MCP服务器提供:
工具:运行pytest,解析测试结果
场景9:云服务管理助手
用户:“检查一下AWS服务器的运行状态”
MCP服务器提供:
资源:通过AWS API获取服务器信息工具:重启服务器,调整配置
场景10:多媒体处理助手
用户:“把这个视频转换成GIF,压缩到5MB以下”
MCP服务器提供:
工具:调用FFmpeg进行视频处理
1.5 小结:开启MCP之旅
在本章中,我们学习了:
✅ 大模型的局限
AI很聪明但缺少”手脚”不能访问外部数据和工具
✅ MCP的价值
给AI装上”手脚”统一的工具接口,一次开发到处使用
✅ MCP的三大能力
资源:读取外部数据工具:执行实际操作提示词:对话模板
✅ MCP的工作原理
主机、客户端、服务器三个角色像餐厅服务流程一样协作
✅ MCP的应用场景
文件管理、数据分析、代码审查等几乎可以做任何需要”手脚”的事情
下一章预告:
在第2章中,我们将动手搭建MCP开发环境,5分钟内运行你的第一个MCP服务器!准备好了吗?让我们继续!
第2章 搭建MCP环境:5分钟开始实践
在这一章,我们将:
✅ 安装Python和必要的工具✅ 安装MCP SDK✅ 在Cursor中配置MCP✅ 运行第一个MCP服务器✅ 测试MCP功能
不要担心,所有步骤都有详细说明,跟着做就行!
2.1 安装Python和基础工具
检查Python版本
MCP需要Python 3.8或更高版本。打开终端(Mac/Linux)或命令提示符(Windows),输入:
python3 --version
如果显示类似 ,说明已经安装了Python。
Python 3.10.0
如果提示”命令未找到”或版本低于3.8,请访问 python.org 下载安装最新版本。
安装Node.js(用于测试)
MCP Inspector是一个测试工具,需要Node.js。打开终端输入:
node --version
如果显示版本号(如),说明已安装。
v18.0.0
如果没有,请访问 nodejs.org 下载安装LTS版本。
安装代码编辑器
推荐使用 Cursor(内置AI功能,完美支持MCP):
访问 cursor.com下载并安装第一次打开会要求登录,可以用GitHub账号
2.2 安装MCP SDK
创建项目目录
# 创建项目文件夹
mkdir mcp-hello-world
cd mcp-hello-world
# 创建Python虚拟环境(推荐)
python3 -m venv venv
# 激活虚拟环境
# Mac/Linux:
source venv/bin/activate
# Windows:
venvScriptsactivate
安装MCP SDK
# 安装MCP Python SDK
pip install mcp
# 验证安装
pip show mcp
你应该看到类似这样的输出:
Name: mcp
Version: 1.5.0
Summary: Model Context Protocol SDK
...
恭喜!MCP SDK安装成功了!
2.3 在Cursor中配置MCP
步骤1:打开MCP配置
打开Cursor点击右上角的 设置图标 ⚙️在左侧菜单中找到 “Tools & MCP”点击底部的 “New MCP Server” 按钮(或点击 “Add a Custom MCP Server”)
步骤2:添加MCP服务器
有两种方式配置:
方式A:通过界面添加(推荐)
在 “Tools & MCP” 页面,点击 “New MCP Server”填写配置:
Name: Command: 使用虚拟环境的Python路径(见下方说明)Args: 点击 “Add Arg”,输入你的
hello-world 完整路径 点击 “Save”
server.py
⚠️ Command路径说明:
如果使用虚拟环境(推荐):
/完整路径/mcp-hello-world/venv/bin/python
例如: 如果全局安装mcp:
/Users/yourname/mcp-hello-world/venv/bin/python 或
python3
/opt/homebrew/bin/python3
方式B:编辑配置文件
在 “Tools & MCP” 页面顶部,点击 “Edit Config”(如果有的话)或者直接编辑配置文件:
Mac/Linux: Windows:
~/.cursor/mcp.json
%APPDATA%Cursormcp.json
添加配置:
配置示例(使用虚拟环境 – 推荐):
{
"mcpServers": {
"hello-world": {
"command": "/Users/yourname/mcp-hello-world/venv/bin/python",
"args": ["/Users/yourname/mcp-hello-world/server.py"]
}
}
}
配置示例(使用全局Python):
{
"mcpServers": {
"hello-world": {
"command": "python3",
"args": ["/Users/yourname/mcp-hello-world/server.py"]
}
}
}
⚠️ 重要提示:
将 替换为你的实际用户目录路径推荐使用虚拟环境配置,避免
/Users/yourname/ 错误Windows用户注意路径格式:
ModuleNotFoundError: No module named 'mcp'
C:\Users\yourname\mcp-hello-world\venv\Scripts\python.exe
获取完整路径:
cd mcp-hello-world
pwd # 显示项目完整路径,例如: /Users/yourname/mcp-hello-world
# Python路径: /Users/yourname/mcp-hello-world/venv/bin/python
# server.py路径: /Users/yourname/mcp-hello-world/server.py
步骤3:验证配置
配置完成后:
回到 “Tools & MCP” 页面你应该能看到 “hello-world” 出现在 “Installed MCP Servers” 列表中如果显示 ✅ 绿色状态,说明配置成功如果显示 ❌ 红色 “Error – Show Output”,点击查看错误信息
2.4 测试第一个MCP服务器
创建server.py文件
在 目录下创建
mcp-hello-world 文件:
server.py
#!/usr/bin/env python3
"""
最简单的MCP服务器 - Hello World
功能:提供一个问候资源,让AI能读取
"""
import asyncio
from typing import Sequence
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.server.lowlevel.helper_types import ReadResourceContents
from mcp.types import Resource
from pydantic import AnyUrl
# 1. 创建MCP服务器实例
app = Server("hello-world-server")
# 2. 告诉MCP有哪些资源可用
@app.list_resources()
async def list_resources() -> list[Resource]:
"""列出所有可用资源"""
return [
Resource(
uri="hello://greeting",
name="问候消息",
description="一个简单的Hello World消息",
mimeType="text/plain"
)
]
# 3. 当AI请求读取资源时,返回内容
@app.read_resource()
async def read_resource(uri: AnyUrl | str) -> Sequence[ReadResourceContents]:
"""读取指定资源的内容"""
uri_str = str(uri).strip()
print(f"📖 收到读取请求,URI: {uri_str}") # 调试信息
if uri_str == "hello://greeting":
content = [
ReadResourceContents(
content="Hello, World! 🎉
欢迎来到MCP的世界!
恭喜你成功创建了第一个MCP资源服务器!",
mime_type="text/plain"
)
]
print(f"✅ 返回内容: {content[0].content}") # 调试信息
return content
error_msg = f"未知的资源URI: {uri_str}"
print(f"❌ 错误: {error_msg}") # 调试信息
raise ValueError(error_msg)
# 4. 启动服务器
async def main():
"""启动服务器主函数"""
print("=" * 50)
print("🚀 Hello World MCP服务器")
print("=" * 50)
print("📖 提供资源: hello://greeting")
print("✨ 服务器启动中...")
print("=" * 50)
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
使用MCP Inspector测试
MCP Inspector是一个图形化测试工具,可以在浏览器中测试MCP服务器。
# 在项目目录下运行
npx @modelcontextprotocol/inspector python3 server.py
浏览器会自动打开,你会看到:
Resources标签:显示 资源点击这个资源,右侧会显示内容:
hello://greeting
Hello, World! 🎉...
恭喜!你的第一个MCP服务器运行成功了!🎉
代码解析
让我们理解这段代码的每一部分:
关键导入:
from mcp.server.lowlevel.helper_types import ReadResourceContents # ⚠️ 重要!
from pydantic import AnyUrl
核心概念:
# 1. 创建服务器实例
app = Server("hello-world-server") # 给服务器起个名字
# 2. list_resources(): 告诉AI有哪些资源
# 就像图书馆的目录,列出所有可用的书
@app.list_resources()
async def list_resources() -> list[Resource]:
return [Resource(...)] # 返回资源列表
# 3. read_resource(): 当AI要读某个资源时,返回内容
# 就像从书架上取书给读者看
@app.read_resource()
async def read_resource(uri: AnyUrl | str) -> Sequence[ReadResourceContents]:
uri_str = str(uri).strip() # 将URI转换为字符串
return [ReadResourceContents(...)] # ⚠️ 注意:用 ReadResourceContents,不是 TextContent
# 4. 启动服务器,开始监听AI的请求
asyncio.run(main())
⚠️ 重要区别:
✅ 资源返回值:使用 (有
ReadResourceContents 和
content 属性)❌ 不要用:
mime_type(那是工具的返回类型,不是资源的)
TextContent
在Cursor中测试
重启Cursor(确保加载新配置)打开聊天窗口输入:AI会自动调用你的MCP服务器,显示问候消息
读取hello://greeting资源的内容
2.5 常见问题与解决方案
在配置和运行MCP服务器时,可能会遇到以下问题:
问题1:ModuleNotFoundError: No module named ‘mcp’
错误现象:
Traceback (most recent call last):
File "/path/to/server.py", line 10, in <module>
from mcp.server import Server
ModuleNotFoundError: No module named 'mcp'
原因分析:
Cursor配置使用的Python环境中没有安装 包常见于macOS/Linux系统Python被包管理器(如Homebrew)管理
mcp
解决方案:
方法1:使用虚拟环境(推荐)
# 1. 在项目目录创建虚拟环境
cd mcp-hello-world
python3 -m venv venv
# 2. 激活虚拟环境
source venv/bin/activate # Mac/Linux
# 或 venvScriptsactivate # Windows
# 3. 安装mcp
pip install mcp
# 4. 获取虚拟环境Python路径
which python # Mac/Linux显示: /path/to/mcp-hello-world/venv/bin/python
# 或 where python # Windows
然后更新Cursor配置,使用虚拟环境的Python:
{
"mcpServers": {
"hello-world": {
"command": "/完整路径/mcp-hello-world/venv/bin/python",
"args": ["/完整路径/mcp-hello-world/server.py"]
}
}
}
方法2:全局安装(不推荐)
# Mac/Linux(可能需要sudo)
pip3 install --user mcp
# 或
python3 -m pip install --break-system-packages mcp
# Windows
pip install mcp
问题2:MCP error 0: 未知的资源
错误现象:
Cursor显示
MCP error 0: 未知的资源: hello://greeting
原因分析:
URI匹配逻辑有误 函数中的URI字符串比较不正确
read_resource
解决方案:
确保 函数正确处理URI:
read_resource
@app.read_resource()
async def read_resource(uri: AnyUrl | str) -> Sequence[ReadResourceContents]:
uri_str = str(uri).strip() # ✅ 转换为字符串并去除空格
print(f"📖 收到URI: {uri_str}") # 添加调试信息
if uri_str == "hello://greeting": # ✅ 使用处理后的字符串比较
return [ReadResourceContents(
content="Hello, World!",
mime_type="text/plain"
)]
raise ValueError(f"未知的资源URI: {uri_str}")
问题3:Cursor中看不到MCP服务器
原因分析:
配置文件格式错误配置文件路径不正确Python路径或server.py路径错误
解决方案:
检查配置文件位置:
Mac/Linux: Windows:
~/.cursor/mcp.json
%APPDATA%Cursormcp.json
验证JSON格式:
# Mac/Linux
cat ~/.cursor/mcp.json | python3 -m json.tool
确认路径使用绝对路径:
# 获取Python路径
which python # 或 where python (Windows)
# 获取server.py路径
cd mcp-hello-world
pwd # 显示当前目录完整路径
重启Cursor:
修改配置后,完全退出并重新打开Cursor
问题4:服务器启动但无响应
原因分析:
服务器代码有错误没有正确实现必需的处理函数
解决方案:
添加调试输出:
async def main():
print("=" * 50)
print("🚀 MCP服务器启动中...")
print("=" * 50)
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
查看Cursor日志:
在 “Tools & MCP” 页面点击服务器旁边的 “Show Output”查看详细错误信息
2.6 小结:环境搭建完成
在本章中,我们:
✅ 安装了开发工具
Python 3.8+Node.js(用于测试)Cursor编辑器
✅ 安装了MCP SDK
使用pip安装mcp包创建了虚拟环境(最佳实践)
✅ 配置了Cursor
编辑MCP配置文件添加第一个MCP服务器
✅ 创建并测试了第一个服务器
编写了Hello World服务器使用Inspector测试在Cursor中成功调用
你已经完成了MCP开发的环境搭建!
回顾一下:
✅ 安装了Python和开发工具✅ 配置了Cursor编辑器✅ 运行了第一个MCP服务器✅ 看到了AI通过MCP读取资源
下一步:
现在你已经知道MCP”能用”了,但可能还不太了解它”能做什么”。在第3章中,我们将系统学习MCP的三大核心功能,让你真正掌握MCP的能力边界!
第3章 创建第一个MCP服务器:Hello World
在第2章中,我们运行了一个简单的Hello World服务器。在这一章,我们将深入学习MCP的三大核心功能,并为每个功能创建完整的示例。
3.1 理解MCP服务器的三大能力
MCP服务器可以提供三种能力,就像一个多才多艺的助手:
1. 资源(Resources):提供数据
比喻:像图书馆的书架
AI可以”借阅”各种数据:
📄 文件内容📊 数据库记录🌐 网页内容📈 API数据
特点:
只读,不修改数据可以有很多资源AI决定何时读取
生活例子:
用户:"总结一下桌面上的report.pdf"
AI:"我需要读取这个文件"
→ 调用资源:file://desktop/report.pdf
→ 获取内容,然后总结
2. 工具(Tools):执行操作
比喻:像工具箱里的工具
AI可以”使用”各种工具:
✉️ 发送邮件📝 创建/修改文件🗄️ 数据库操作🔧 调用API
特点:
会修改数据或产生副作用需要参数有明确的输入和输出
生活例子:
用户:"给张三发邮件,告诉他明天开会"
AI:"我需要发送邮件"
→ 调用工具:send_email(to="zhangsan@example.com", subject="明天开会", body="...")
→ 邮件发送成功
3. 提示词(Prompts):对话模板
比喻:像餐厅的套餐菜单
预定义的对话流程,让AI知道如何处理常见任务:
📧 “写商务邮件”模板📊 “数据分析报告”模板🐛 “代码审查”模板
特点:
预定义的对话结构可以有参数提高对话效率
生活例子:
用户选择:"写商务邮件"提示词
→ AI:"好的,请告诉我收件人、主题和要点"
→ 用户提供信息
→ AI按照模板生成专业邮件
三种能力的对比
| 能力 | 作用 | 副作用 | 类比 |
|---|---|---|---|
| 资源 | 读取数据 | ❌ 无 | 图书馆借书 |
| 工具 | 执行操作 | ✅ 有 | 使用工具修理东西 |
| 提示词 | 引导对话 | ❌ 无 | 餐厅菜单 |
3.2 创建资源服务器:提供数据
在第2章,我们创建了一个返回固定文本的Hello World资源。现在,让我们更进一步:创建一个能读取真实文件的”笔记本资源服务器”。
学习目标:
✅ 动态列出多个资源(而不是固定的1个)✅ 从文件系统读取真实数据✅ 错误处理(文件不存在等)
项目结构
notebook-resource-server/
├── notes/ # 笔记文件夹
│ ├── meeting-2024-01.md
│ ├── ideas.md
│ └── todo.md
├── server.py # MCP服务器
└── requirements.txt # 依赖
步骤1:创建项目
# 创建项目目录
mkdir notebook-resource-server
cd notebook-resource-server
# 创建笔记文件夹
mkdir notes
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate # Mac/Linux
# venvScriptsactivate # Windows
# 安装MCP
pip install mcp
步骤2:创建示例笔记
创建 :
notes/meeting-2024-01.md
# 2024年1月会议记录
## 日期:2024-01-15
### 参会人员
- 张三(产品经理)
- 李四(技术负责人)
- 王五(设计师)
### 讨论内容
1. 新版本功能规划
2. UI设计方案评审
3. 开发时间表确认
### 决定事项
- 2月1日发布新版本
- 增加暗黑模式
- 优化移动端体验
### 待办事项
- [ ] 张三:完成需求文档
- [ ] 李四:评估技术方案
- [ ] 王五:设计界面原型
创建 :
notes/ideas.md
# 创意想法
## AI助手改进
- 增加语音输入功能
- 支持多语言
- 离线模式
## 用户体验优化
- 简化注册流程
- 增加新手引导
- 优化加载速度
创建 :
notes/todo.md
# 待办清单
## 本周任务
- [x] 完成项目文档
- [ ] 修复已知bug
- [ ] 准备演示PPT
## 下周计划
- [ ] 开始新功能开发
- [ ] 代码审查
- [ ] 性能测试
步骤3:编写服务器代码
创建 :
server.py
#!/usr/bin/env python3
"""
笔记本资源服务器
让AI能够读取你的笔记文件
"""
import asyncio
import os
from pathlib import Path
from typing import Sequence
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.server.lowlevel.helper_types import ReadResourceContents
from mcp.types import Resource
from pydantic import AnyUrl
# 创建MCP服务器
app = Server("notebook-resource-server")
# 笔记文件夹路径
NOTES_DIR = Path(__file__).parent / "notes"
@app.list_resources()
async def list_resources() -> list[Resource]:
"""
列出所有笔记文件
就像列出图书馆的所有书籍
"""
resources = []
# 检查notes文件夹是否存在
if not NOTES_DIR.exists():
print(f"警告:笔记文件夹不存在: {NOTES_DIR}")
return resources
# 遍历notes文件夹中的所有.md文件
for note_file in NOTES_DIR.glob("*.md"):
# 创建资源URI,格式: note://文件名
uri = f"note://{note_file.stem}"
resources.append(
Resource(
uri=uri,
name=note_file.stem,
description=f"笔记:{note_file.stem}",
mimeType="text/markdown"
)
)
print(f"📚 找到 {len(resources)} 个笔记")
return resources
@app.read_resource()
async def read_resource(uri: AnyUrl | str) -> Sequence[ReadResourceContents]:
"""
读取笔记内容
就像从图书馆借书阅读
"""
uri_str = str(uri).strip()
# 解析URI,提取文件名
# 例如:note://meeting-2024-01 -> meeting-2024-01
if not uri_str.startswith("note://"):
raise ValueError(f"无效的URI: {uri_str}")
filename = uri_str.replace("note://", "")
file_path = NOTES_DIR / f"{filename}.md"
# 检查文件是否存在
if not file_path.exists():
raise ValueError(f"笔记不存在: {filename}")
# 读取文件内容
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
print(f"📖 读取笔记: {filename}")
# 返回内容
return [
ReadResourceContents(
content=content,
mime_type="text/markdown"
)
]
except Exception as e:
raise ValueError(f"读取笔记失败: {str(e)}")
async def main():
"""启动服务器"""
print("=" * 50)
print("📓 笔记本资源服务器")
print("=" * 50)
print(f"📁 笔记目录: {NOTES_DIR}")
print("🚀 服务器启动中...")
print("=" * 50)
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
步骤4:测试服务器
使用MCP Inspector测试:
npx @modelcontextprotocol/inspector python3 server.py
在浏览器中:
查看 Resources 标签,应该看到3个笔记点击 ,查看内容尝试读取其他笔记
meeting-2024-01
在Cursor中配置和测试:
编辑Cursor的MCP配置:
{
"mcpServers": {
"notebook": {
"command": "python3",
"args": ["/完整路径/notebook-resource-server/server.py"]
}
}
}
注意:替换为实际路径,如
/Users/yourname/notebook-resource-server/server.py
重启Cursor,然后在聊天中输入:
"请列出我的所有笔记"
"读取会议记录笔记的内容"
"总结ideas笔记的要点"
代码解释
让我们理解关键部分:
1. 列出资源(list_resources)
@app.list_resources()
async def list_resources() -> list[Resource]:
# 像图书馆的图书目录,列出所有可用的书
for note_file in NOTES_DIR.glob("*.md"):
uri = f"note://{note_file.stem}"
# 返回资源列表
关键点:
返回 类型每个
list[Resource] 包含:
Resource
:唯一标识(如
uri)
note://meeting-2024-01:显示名称
name:描述
description:内容类型
mimeType
2. 读取资源(read_resource)
@app.read_resource()
async def read_resource(uri: str) -> ReadResourceContents:
# 根据URI找到对应的文件
# 读取内容并返回
return [TextContent(type="text", text=content)]
关键点:
接收 参数返回
uri(即
ReadResourceContents)如果资源不存在,抛出
Sequence[TextContent]
ValueError
扩展练习
尝试为服务器添加更多功能:
支持子文件夹
# 修改list_resources,支持递归查找
for note_file in NOTES_DIR.rglob("*.md"):
# rglob递归查找所有.md文件
添加搜索功能
# 在read_resource中添加搜索关键词的能力
if uri.startswith("search://"):
keyword = uri.replace("search://", "")
# 搜索包含关键词的笔记
支持其他文件格式
# 支持.txt, .json等格式
for file in NOTES_DIR.glob("*.*"):
# 根据扩展名设置不同的mimeType
3.2小结
恭喜!你已经掌握了MCP资源服务器的核心能力:
✅ 使用 列出多个资源
list_resources()
✅ 使用 读取资源内容
read_resource()
✅ 从文件系统读取真实数据
✅ 处理错误和边界情况
关键理解:
资源是只读的,AI只能查看,不能修改资源适合提供参考数据:文档、配置、历史记录等URI就像图书馆的书号,用于唯一标识资源
下一步:你已经让AI能”读”数据了,现在让我们赋予AI”写”的能力!
3.3 创建工具服务器:执行操作
从读到写的跃进:
在3.2节中,我们学会了如何让AI”读取”数据(资源)。现在,让我们赋予AI”执行操作”的能力(工具)。
资源 vs 工具的区别:
📚 资源:只读,像借书,不能修改🔧 工具:可写,像使用工具,会产生实际效果
学习目标:
✅ 创建可以执行操作的工具(而不是只读资源)✅ 处理工具参数(工具需要输入)✅ 实现安全验证(防止恶意操作)
项目结构
file-manager-server/
├── workspace/ # 工作目录
├── server.py # MCP服务器
└── requirements.txt # 依赖
步骤1:创建项目
# 创建项目目录
mkdir file-manager-server
cd file-manager-server
# 创建工作目录
mkdir workspace
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate
# 安装MCP
pip install mcp
步骤2:编写服务器代码
创建 :
server.py
#!/usr/bin/env python3
"""
文件管理工具服务器
让AI能够创建、读取、修改和删除文件
"""
import asyncio
import os
from pathlib import Path
from typing import Any, Sequence
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# 创建MCP服务器
app = Server("file-manager-server")
# 工作目录
WORKSPACE = Path(__file__).parent / "workspace"
WORKSPACE.mkdir(exist_ok=True)
def validate_path(filename: str) -> Path:
"""
验证文件路径的安全性
防止路径遍历攻击,如:../../etc/passwd
"""
# 构建完整路径
full_path = (WORKSPACE / filename).resolve()
# 确保路径在workspace内
if not str(full_path).startswith(str(WORKSPACE.resolve())):
raise ValueError(f"不安全的路径: {filename}")
return full_path
@app.list_tools()
async def list_tools() -> list[Tool]:
"""
列出所有可用工具
就像列出工具箱里的所有工具
"""
return [
Tool(
name="create_file",
description="创建新文件。如果文件已存在会覆盖。",
inputSchema={
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "文件名(相对于workspace目录)"
},
"content": {
"type": "string",
"description": "文件内容"
}
},
"required": ["filename", "content"]
}
),
Tool(
name="read_file",
description="读取文件内容",
inputSchema={
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "文件名(相对于workspace目录)"
}
},
"required": ["filename"]
}
),
Tool(
name="delete_file",
description="删除文件",
inputSchema={
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "文件名(相对于workspace目录)"
}
},
"required": ["filename"]
}
),
Tool(
name="list_files",
description="列出workspace中的所有文件",
inputSchema={
"type": "object",
"properties": {}
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""
执行工具
就像使用工具箱里的工具完成任务
"""
if name == "create_file":
# 创建文件
filename = arguments["filename"]
content = arguments["content"]
try:
file_path = validate_path(filename)
# 创建父目录(如果不存在)
file_path.parent.mkdir(parents=True, exist_ok=True)
# 写入文件
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
print(f"✅ 创建文件: {filename}")
return [
TextContent(
type="text",
text=f"文件创建成功: {filename}
文件大小: {len(content)} 字符"
)
]
except Exception as e:
print(f"❌ 创建文件失败: {e}")
return [
TextContent(
type="text",
text=f"创建文件失败: {str(e)}"
)
]
elif name == "read_file":
# 读取文件
filename = arguments["filename"]
try:
file_path = validate_path(filename)
if not file_path.exists():
return [
TextContent(
type="text",
text=f"文件不存在: {filename}"
)
]
# 读取文件
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
print(f"📖 读取文件: {filename}")
return [
TextContent(
type="text",
text=f"文件: {filename}
{content}"
)
]
except Exception as e:
print(f"❌ 读取文件失败: {e}")
return [
TextContent(
type="text",
text=f"读取文件失败: {str(e)}"
)
]
elif name == "delete_file":
# 删除文件
filename = arguments["filename"]
try:
file_path = validate_path(filename)
if not file_path.exists():
return [
TextContent(
type="text",
text=f"文件不存在: {filename}"
)
]
# 删除文件
file_path.unlink()
print(f"🗑️ 删除文件: {filename}")
return [
TextContent(
type="text",
text=f"文件删除成功: {filename}"
)
]
except Exception as e:
print(f"❌ 删除文件失败: {e}")
return [
TextContent(
type="text",
text=f"删除文件失败: {str(e)}"
)
]
elif name == "list_files":
# 列出所有文件
try:
files = []
for item in WORKSPACE.rglob("*"):
if item.is_file():
# 获取相对路径
rel_path = item.relative_to(WORKSPACE)
# 获取文件大小
size = item.stat().st_size
files.append(f"📄 {rel_path} ({size} bytes)")
if not files:
result = "workspace目录为空"
else:
result = "文件列表:
" + "
".join(files)
print(f"📋 列出 {len(files)} 个文件")
return [
TextContent(
type="text",
text=result
)
]
except Exception as e:
print(f"❌ 列出文件失败: {e}")
return [
TextContent(
type="text",
text=f"列出文件失败: {str(e)}"
)
]
else:
raise ValueError(f"未知的工具: {name}")
async def main():
"""启动服务器"""
print("=" * 50)
print("🗂️ 文件管理工具服务器")
print("=" * 50)
print(f"📁 工作目录: {WORKSPACE}")
print("🚀 服务器启动中...")
print("=" * 50)
print("可用工具:")
print(" • create_file - 创建文件")
print(" • read_file - 读取文件")
print(" • delete_file - 删除文件")
print(" • list_files - 列出所有文件")
print("=" * 50)
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
步骤3:测试服务器
使用MCP Inspector测试:
npx @modelcontextprotocol/inspector python3 server.py
在浏览器中:
查看 Tools 标签,应该看到4个工具测试 :
create_file
{
"filename": "hello.txt",
"content": "Hello, MCP!"
}
测试 ,查看创建的文件测试
list_files,读取文件内容测试
read_file,删除文件
delete_file
在Cursor中配置和测试:
编辑Cursor的MCP配置:
{
"mcpServers": {
"file-manager": {
"command": "python3",
"args": ["/完整路径/file-manager-server/server.py"]
}
}
}
注意:替换为实际路径,如
/Users/yourname/file-manager-server/server.py
重启Cursor,然后在聊天中输入:
"创建一个名为shopping-list.txt的文件,内容是:牛奶、面包、鸡蛋"
"列出workspace中的所有文件"
"读取shopping-list.txt的内容"
"删除shopping-list.txt"
代码解释
1. 定义工具(list_tools)
Tool(
name="create_file",
description="创建新文件...",
inputSchema={
"type": "object",
"properties": {
"filename": {"type": "string", ...},
"content": {"type": "string", ...}
},
"required": ["filename", "content"]
}
)
关键点:
:工具名称(AI调用时使用)
name:工具描述(帮助AI理解何时使用)
description:JSON Schema格式的参数定义
inputSchema
:参数列表
properties:必需参数
required
2. 执行工具(call_tool)
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]:
if name == "create_file":
# 执行创建文件操作
# 返回操作结果
elif name == "read_file":
# 执行读取文件操作
关键点:
接收 (工具名)和
name(参数)返回
arguments(操作结果)应该处理异常并返回友好的错误信息
Sequence[TextContent]
3. 安全性验证(validate_path)
def validate_path(filename: str) -> Path:
full_path = (WORKSPACE / filename).resolve()
# 确保路径在workspace内
if not str(full_path).startswith(str(WORKSPACE.resolve())):
raise ValueError(f"不安全的路径: {filename}")
这个函数防止路径遍历攻击:
❌ – 危险!❌
../../etc/passwd – 危险!✅
/etc/passwd – 安全✅
hello.txt – 安全
subfolder/hello.txt
扩展练习
添加文件修改工具
Tool(
name="append_to_file",
description="在文件末尾添加内容",
inputSchema={...}
)
添加文件搜索工具
Tool(
name="search_files",
description="搜索包含关键词的文件",
inputSchema={...}
)
添加文件重命名工具
Tool(
name="rename_file",
description="重命名文件",
inputSchema={...}
)
3.3小结
恭喜!你现在掌握了MCP工具服务器的核心能力:
✅ 使用 列出可用工具
list_tools()
✅ 使用 执行工具操作
call_tool()
✅ 定义工具的参数Schema
✅ 实现安全验证(路径遍历防护)
✅ 返回清晰的执行结果
资源 vs 工具对比:
| 特性 | 资源(Resource) | 工具(Tool) |
|---|---|---|
| 作用 | 提供数据 | 执行操作 |
| 副作用 | ❌ 无(只读) | ✅ 有(会修改) |
| 参数 | URI(资源标识) | 任意参数 |
| 返回 | 数据内容 | 执行结果 |
| 示例 | 读取文件、查询数据 | 创建文件、发邮件 |
下一步:AI现在会”读”和”写”了,但有时它不知道该做什么。让我们教它一些”工作模板”!
3.4 创建提示词服务器:引导对话
从”能做”到”会做”:
我们已经学会了:
3.2:资源 – AI能”读”数据3.3:工具 – AI能”做”事情
但有时候,AI不知道该怎么用这些能力。提示词就像”任务模板”,告诉AI应该如何完成特定任务。
生活例子:
🔧 没有模板:“给我一把锤子”(AI不知道要做什么)📋 有模板:“组装IKEA家具”(AI知道:1. 拿锤子 2. 看说明书 3. 按步骤组装)
学习目标:
✅ 创建对话模板(预定义的工作流程)✅ 使用参数动态生成提示词✅ 提高AI的工作效率
项目结构
writing-prompts-server/
├── server.py
└── requirements.txt
步骤1:创建项目
mkdir writing-prompts-server
cd writing-prompts-server
python3 -m venv venv
source venv/bin/activate
pip install mcp
步骤2:编写服务器代码
创建 :
server.py
#!/usr/bin/env python3
"""
写作助手提示词服务器
提供各种写作场景的对话模板
"""
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Prompt, PromptMessage, TextContent, GetPromptResult
# 创建MCP服务器
app = Server("writing-prompts-server")
@app.list_prompts()
async def list_prompts() -> list[Prompt]:
"""
列出所有可用的提示词
就像餐厅的菜单,列出所有套餐
"""
return [
Prompt(
name="business-email",
description="生成商务邮件",
arguments=[
{
"name": "recipient",
"description": "收件人姓名",
"required": True
},
{
"name": "purpose",
"description": "邮件目的(如:会议邀请、进度汇报、合作提案)",
"required": True
},
{
"name": "key_points",
"description": "要点(用逗号分隔)",
"required": True
},
{
"name": "tone",
"description": "语气(正式/友好/严肃),默认:正式",
"required": False
}
]
),
Prompt(
name="blog-post",
description="撰写博客文章",
arguments=[
{
"name": "topic",
"description": "文章主题",
"required": True
},
{
"name": "target_audience",
"description": "目标读者(如:初学者、专业人士)",
"required": True
},
{
"name": "word_count",
"description": "字数要求(如:800、1500),默认:1000",
"required": False
}
]
),
Prompt(
name="code-review",
description="代码审查报告",
arguments=[
{
"name": "language",
"description": "编程语言",
"required": True
},
{
"name": "focus_areas",
"description": "关注点(如:性能、安全性、可读性),用逗号分隔",
"required": True
}
]
),
Prompt(
name="meeting-summary",
description="会议纪要",
arguments=[
{
"name": "meeting_topic",
"description": "会议主题",
"required": True
},
{
"name": "participants",
"description": "参会人员,用逗号分隔",
"required": True
}
]
)
]
@app.get_prompt()
async def get_prompt(name: str, arguments: dict) -> GetPromptResult:
"""
根据提示词名称和参数生成消息
就像根据菜单点的套餐生成订单
"""
if name == "business-email":
# 商务邮件提示词
recipient = arguments["recipient"]
purpose = arguments["purpose"]
key_points = arguments["key_points"]
tone = arguments.get("tone", "正式")
prompt_text = f"""请帮我撰写一封商务邮件,要求如下:
**收件人**: {recipient}
**邮件目的**: {purpose}
**要点**: {key_points}
**语气**: {tone}
请生成一封专业的商务邮件,包括:
1. 适当的称呼
2. 清晰的邮件正文(包含所有要点)
3. 礼貌的结束语
4. 合适的签名位置
邮件应该:
- 结构清晰,层次分明
- 语言{tone}得体
- 突出重点
- 便于阅读"""
return GetPromptResult(
messages=[
PromptMessage(
role="user",
content=TextContent(
type="text",
text=prompt_text
)
)
]
)
elif name == "blog-post":
# 博客文章提示词
topic = arguments["topic"]
target_audience = arguments["target_audience"]
word_count = arguments.get("word_count", "1000")
prompt_text = f"""请帮我撰写一篇博客文章,要求如下:
**主题**: {topic}
**目标读者**: {target_audience}
**字数**: 约{word_count}字
请撰写一篇高质量的博客文章,包括:
1. **引人入胜的标题**
2. **吸引人的开头**(引出主题,激发兴趣)
3. **结构清晰的正文**:
- 使用小标题划分段落
- 观点明确,论证充分
- 包含实际例子或案例
4. **总结和号召行动**
文章风格要求:
- 适合{target_audience}阅读
- 通俗易懂,避免过多术语
- 逻辑清晰,层次分明
- 可读性强"""
return GetPromptResult(
messages=[
PromptMessage(
role="user",
content=TextContent(
type="text",
text=prompt_text
)
)
]
)
elif name == "code-review":
# 代码审查提示词
language = arguments["language"]
focus_areas = arguments["focus_areas"]
prompt_text = f"""请对以下{language}代码进行详细审查。
**关注重点**: {focus_areas}
请从以下方面进行审查:
1. **代码质量**
- 命名规范
- 代码可读性
- 注释完整性
2. **功能正确性**
- 逻辑是否正确
- 边界条件处理
- 错误处理
3. **{focus_areas}**(重点关注)
4. **改进建议**
- 具体的改进方案
- 示例代码(如果需要)
请提供:
- ✅ 做得好的地方
- ⚠️ 需要改进的地方
- 💡 优化建议
现在,请粘贴需要审查的代码,我将进行详细分析。"""
return GetPromptResult(
messages=[
PromptMessage(
role="user",
content=TextContent(
type="text",
text=prompt_text
)
)
]
)
elif name == "meeting-summary":
# 会议纪要提示词
meeting_topic = arguments["meeting_topic"]
participants = arguments["participants"]
prompt_text = f"""请帮我整理会议纪要,信息如下:
**会议主题**: {meeting_topic}
**参会人员**: {participants}
请根据会议内容生成标准的会议纪要,包括:
1. **会议基本信息**
- 日期、时间
- 地点
- 参会人员
2. **讨论内容**
- 主要议题
- 讨论要点
- 不同观点
3. **决定事项**
- 达成的共识
- 确定的方案
- 分工安排
4. **待办事项**
- 任务清单
- 责任人
- 截止时间
5. **下次会议安排**(如有)
请以清晰、专业的格式整理,便于存档和分发。
现在,请告诉我会议的具体内容,我将帮你整理成纪要。"""
return GetPromptResult(
messages=[
PromptMessage(
role="user",
content=TextContent(
type="text",
text=prompt_text
)
)
]
)
else:
raise ValueError(f"未知的提示词: {name}")
async def main():
"""启动服务器"""
print("=" * 50)
print("✍️ 写作助手提示词服务器")
print("=" * 50)
print("🚀 服务器启动中...")
print("=" * 50)
print("可用提示词:")
print(" • business-email - 商务邮件")
print(" • blog-post - 博客文章")
print(" • code-review - 代码审查")
print(" • meeting-summary - 会议纪要")
print("=" * 50)
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
步骤3:测试服务器
使用MCP Inspector测试:
npx @modelcontextprotocol/inspector python3 server.py
在浏览器中:
查看 Prompts 标签,应该看到4个提示词测试 :
business-email
{
"recipient": "张经理",
"purpose": "项目进度汇报",
"key_points": "完成了需求分析,遇到技术难题,需要额外资源",
"tone": "正式"
}
查看生成的提示消息
在Cursor中配置和测试:
编辑Cursor的MCP配置:
{
"mcpServers": {
"writing-prompts": {
"command": "python3",
"args": ["/完整路径/writing-prompts-server/server.py"]
}
}
}
注意:替换为实际路径,如
/Users/yourname/writing-prompts-server/server.py
步骤4:在Cursor中使用提示词
重启Cursor:配置完成后,完全关闭并重新打开Cursor,让MCP服务器加载。
在聊天界面使用提示词:
打开聊天界面
在Cursor中,按 (Mac)或
Cmd+L(Windows/Linux)打开AI聊天面板
Ctrl+L
找到提示词入口
在聊天输入框的上方或左侧,你会看到一些图标按钮查找 提示词图标(通常显示为 📝、✨ 或 “Prompts” 文字按钮)如果看不到,尝试:
点击输入框左侧的 “+” 或 “⚡” 图标或者直接在输入框中输入 ,看是否出现提示词选项
/
选择提示词
点击提示词图标后,会弹出提示词列表在列表中找到并点击 (显示为”生成商务邮件”)
business-email
填写参数
选择提示词后,会弹出参数填写表单按照提示填写:
收件人:例如 “张经理”邮件目的:例如 “项目进度汇报”要点:例如 “完成了需求分析,遇到技术难题,需要额外资源”语气(可选):例如 “正式”、“友好” 或 “严肃” 填写完成后,点击 “提交” 或 “生成” 按钮
查看结果
AI会根据你填写的参数,自动生成完整的提示消息这个提示消息会被发送给AI,AI会根据模板要求帮你撰写商务邮件
💡 提示:
如果找不到提示词入口,确保MCP服务器已正确配置并重启了Cursor可以在Cursor的MCP日志中查看服务器是否正常运行不同版本的Cursor界面可能略有差异,但提示词功能通常在聊天输入框附近
代码解释
1. 定义提示词(list_prompts)
Prompt(
name="business-email",
description="生成商务邮件",
arguments=[
{
"name": "recipient",
"description": "收件人姓名",
"required": True
},
...
]
)
关键点:
:提示词名称
name:描述
description:参数列表
arguments
:参数名
name:参数描述
description:是否必需
required
2. 生成提示消息(get_prompt)
@app.get_prompt()
async def get_prompt(name: str, arguments: dict) -> GetPromptResult:
prompt_text = f"根据参数生成提示..."
return GetPromptResult(
messages=[
PromptMessage(
role="user",
content=TextContent(type="text", text=prompt_text)
)
]
)
关键点:
接收 和
name返回
arguments 对象,包含
GetPromptResult 列表
messages 的
PromptMessage 必须是
content 对象
TextContent
提示词的最佳实践
清晰的结构
prompt_text = """
1. 第一部分
2. 第二部分
3. 第三部分
"""
具体的要求
# ✅ 好
"写一篇800字的文章,面向初学者,包含3个实例"
# ❌ 不好
"写篇文章"
合理的默认值
tone = arguments.get("tone", "正式") # 提供默认值
word_count = arguments.get("word_count", "1000")
3.5 小结:掌握三大核心功能
在本章中,我们深入学习了MCP的三大核心功能:
✅ 资源(Resources)
让AI能读取外部数据创建了笔记本资源服务器学会了 和
list_resources
read_resource
✅ 工具(Tools)
让AI能执行实际操作创建了文件管理工具服务器学会了 和
list_tools理解了安全性验证的重要性
call_tool
✅ 提示词(Prompts)
让AI知道如何处理常见任务创建了写作助手提示词服务器学会了 和
list_prompts掌握了提示词的最佳实践
get_prompt
三种能力对比:
| 功能 | 作用 | 装饰器 | 返回类型 | 副作用 |
|---|---|---|---|---|
| 资源 | 读取数据 | |
|
❌ 无 |
| 工具 | 执行操作 | |
|
✅ 有 |
| 提示词 | 引导对话 | |
|
❌ 无 |
学习建议:
按照难度顺序学习第4章:
⭐ 天气查询(30分钟)- 最简单,先学这个建立信心⭐⭐ 网页抓取(1小时)- 中等难度,练习HTTP和解析⭐⭐⭐ 数据库查询(1.5小时)- 需要SQL知识⭐⭐⭐⭐ Git操作(2小时)- 最复杂,需要Git基础
下一章预告:
在第4章中,我们将构建4个完整的实战项目,让你的MCP技能从理论走向实战!每个项目都是工作中常用的功能,学完后你就能开发自己的MCP应用了!
第4章 实战项目:构建实用MCP服务器
在前面的章节中,我们学习了MCP的基本概念和三大核心功能。现在,让我们把这些知识应用到实际项目中!
代码质量保证:
✅ 所有代码都经过验证,可以直接运行
✅ 每个项目都包含完整的依赖说明
✅ 所有函数都有正确的类型注解
✅ 包含完善的错误处理和日志输出
在这一章,我们将构建4个实用的MCP服务器,按照从简单到复杂的顺序:
| 难度 | 项目 | 核心技能 | 学习时间 |
|---|---|---|---|
| ⭐ | 天气查询服务器 | 调用REST API、JSON处理 | 30分钟 |
| ⭐⭐ | 网页抓取服务器 | HTTP请求、HTML解析 | 1小时 |
| ⭐⭐⭐ | 数据库查询服务器 | SQL查询、数据结构 | 1.5小时 |
| ⭐⭐⭐⭐ | Git操作服务器 | Git命令、版本控制 | 2小时 |
每个项目都包含:
✅ 完整的源代码(可直接运行)✅ 详细的说明(逐行注释)✅ 测试方法(Inspector + Cursor)✅ 实际应用场景
4.1 项目1:天气查询服务器 ⭐
为什么从这个开始?
✅ 最简单:只需要调用API,处理JSON✅ 见效快:30分钟就能看到结果✅ 实用性强:真实的天气数据
这个项目将创建一个能查询实时天气的MCP服务器,使用和风天气API(免费)。
应用场景
📱 在Cursor中快速查询天气🤖 让AI助手告诉你出行是否需要带伞📊 分析多个城市的天气情况
项目结构
weather-server/
├── server.py # 主服务器
├── config.py # 配置文件
├── .env # API密钥(不提交到Git)
└── requirements.txt # 依赖
步骤1:注册和风天气API
访问 和风天气开发平台注册账号(免费)创建应用,获取API Key免费版每天1000次请求
步骤2:创建项目
mkdir weather-server
cd weather-server
python3 -m venv venv
source venv/bin/activate
pip install mcp requests python-dotenv
创建 :
requirements.txt
mcp>=1.0.0
requests>=2.31.0
python-dotenv>=1.0.0
步骤3:配置API密钥
创建 文件(注意添加到
.env):
.gitignore
QWEATHER_API_KEY=你的API密钥
⚠️ 重要:将 替换为从和风天气平台获取的实际 API Key,例如:
你的API密钥
QWEATHER_API_KEY=1234567890abcdef
创建 :
.gitignore
.env
venv/
__pycache__/
*.pyc
步骤4:编写服务器代码
创建 :
server.py
#!/usr/bin/env python3
"""
天气查询MCP服务器
使用和风天气API查询实时天气信息
"""
import asyncio
import os
from typing import Any, Sequence
import requests
from dotenv import load_dotenv
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# 加载环境变量
load_dotenv()
# 创建MCP服务器
app = Server("weather-server")
# 和风天气API配置
API_KEY = os.getenv("QWEATHER_API_KEY")
# 需要在https://console.qweather.com/setting?lang=zh找到自己的API host,host不对会遇到403的问题
BASE_URL = "https://pv7jpguvya.re.qweatherapi.com/v7"
# 城市ID映射(常用城市)
CITY_IDS = {
"北京": "101010100",
"上海": "101020100",
"广州": "101280101",
"深圳": "101280601",
"杭州": "101210101",
"成都": "101270101",
"西安": "101110101",
"武汉": "101200101",
"南京": "101190101",
"天津": "101030100"
}
def get_weather_now(city: str) -> dict:
"""
获取实时天气
Args:
city: 城市名称
Returns:
天气信息字典
"""
# 获取城市ID
location = CITY_IDS.get(city)
if not location:
return {"error": f"不支持的城市: {city}。支持的城市: {', '.join(CITY_IDS.keys())}"}
# 检查API Key
if not API_KEY:
return {"error": "未配置QWEATHER_API_KEY,请在.env文件中添加"}
# 调用API
try:
url = f"{BASE_URL}/weather/now"
params = {
"location": location,
"key": API_KEY
}
response = requests.get(url, params=params, timeout=10)
data = response.json()
if data.get("code") != "200":
return {"error": f"API错误: {data.get('code')}"}
# 解析天气数据
now = data.get("now", {})
return {
"city": city,
"temperature": now.get("temp", "N/A"),
"feels_like": now.get("feelsLike", "N/A"),
"weather": now.get("text", "N/A"),
"wind_direction": now.get("windDir", "N/A"),
"wind_scale": now.get("windScale", "N/A"),
"humidity": now.get("humidity", "N/A"),
"update_time": now.get("obsTime", "N/A")
}
except requests.Timeout:
return {"error": "请求超时,请稍后重试"}
except Exception as e:
return {"error": f"查询失败: {str(e)}"}
def get_weather_forecast(city: str, days: int = 3) -> dict:
"""
获取天气预报
Args:
city: 城市名称
days: 预报天数(3或7)
Returns:
预报信息字典
"""
location = CITY_IDS.get(city)
if not location:
return {"error": f"不支持的城市: {city}"}
if not API_KEY:
return {"error": "未配置QWEATHER_API_KEY"}
# 选择预报接口
endpoint = "weather/3d" if days == 3 else "weather/7d"
try:
url = f"{BASE_URL}/{endpoint}"
params = {
"location": location,
"key": API_KEY
}
response = requests.get(url, params=params, timeout=10)
data = response.json()
if data.get("code") != "200":
return {"error": f"API错误: {data.get('code')}"}
# 解析预报数据
daily = data.get("daily", [])
forecasts = []
for day in daily[:days]:
forecasts.append({
"date": day.get("fxDate", "N/A"),
"temp_max": day.get("tempMax", "N/A"),
"temp_min": day.get("tempMin", "N/A"),
"weather_day": day.get("textDay", "N/A"),
"weather_night": day.get("textNight", "N/A"),
"wind_direction": day.get("windDirDay", "N/A"),
"wind_scale": day.get("windScaleDay", "N/A")
})
return {
"city": city,
"forecasts": forecasts
}
except Exception as e:
return {"error": f"查询失败: {str(e)}"}
def format_weather_now(data: dict) -> str:
"""格式化实时天气信息"""
if "error" in data:
return f"❌ {data['error']}"
return f"""📍 {data['city']} 实时天气
🌡️ **温度**: {data['temperature']}°C(体感 {data['feels_like']}°C)
☁️ **天气**: {data['weather']}
💨 **风向**: {data['wind_direction']} {data['wind_scale']}级
💧 **湿度**: {data['humidity']}%
🕐 更新时间: {data['update_time']}"""
def format_weather_forecast(data: dict) -> str:
"""格式化天气预报"""
if "error" in data:
return f"❌ {data['error']}"
lines = [f"📍 {data['city']} 天气预报
"]
for forecast in data['forecasts']:
lines.append(f"""📅 **{forecast['date']}**
🌡️ 温度: {forecast['temp_min']}°C ~ {forecast['temp_max']}°C
☀️ 白天: {forecast['weather_day']}
🌙 夜间: {forecast['weather_night']}
💨 风向: {forecast['wind_direction']} {forecast['wind_scale']}级
""")
return "
".join(lines)
@app.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有可用工具"""
return [
Tool(
name="get_weather",
description=f"查询实时天气。支持的城市: {', '.join(CITY_IDS.keys())}",
inputSchema={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称",
"enum": list(CITY_IDS.keys())
}
},
"required": ["city"]
}
),
Tool(
name="get_forecast",
description=f"查询天气预报。支持的城市: {', '.join(CITY_IDS.keys())}",
inputSchema={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称",
"enum": list(CITY_IDS.keys())
},
"days": {
"type": "integer",
"description": "预报天数(3或7天)",
"enum": [3, 7],
"default": 3
}
},
"required": ["city"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具"""
if name == "get_weather":
city = arguments["city"]
print(f"🌤️ 查询 {city} 实时天气...")
data = get_weather_now(city)
result = format_weather_now(data)
return [TextContent(type="text", text=result)]
elif name == "get_forecast":
city = arguments["city"]
days = arguments.get("days", 3)
print(f"📅 查询 {city} {days}天预报...")
data = get_weather_forecast(city, days)
result = format_weather_forecast(data)
return [TextContent(type="text", text=result)]
else:
raise ValueError(f"未知的工具: {name}")
async def main():
"""启动服务器"""
print("=" * 50)
print("🌤️ 天气查询MCP服务器")
print("=" * 50)
if not API_KEY:
print("⚠️ 警告: 未配置QWEATHER_API_KEY")
print("请创建.env文件并添加: QWEATHER_API_KEY=你的密钥")
print("获取API密钥: https://dev.qweather.com/")
else:
print(f"✅ API密钥已配置: {API_KEY[:8]}...")
print(f"📍 支持城市: {', '.join(CITY_IDS.keys())}")
print("🚀 服务器启动中...")
print("=" * 50)
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
步骤5:测试服务器
测试实时天气:
npx @modelcontextprotocol/inspector python3 server.py
在Inspector中测试:
{
"city": "北京"
}
在Cursor中配置:
{
"mcpServers": {
"weather": {
"command": "python3",
"args": ["/完整路径/weather-server/server.py"],
"env": {
"QWEATHER_API_KEY": "你的API密钥"
}
}
}
}
⚠️ 配置说明:
将 替换为实际路径,如
/完整路径/将
/Users/yourname/weather-server/server.py 替换为实际的和风天气 API Key或者使用
你的API密钥 文件配置(推荐),则不需要在这里配置
.env
env
在Cursor中测试:
"北京今天天气怎么样?"
"查询上海未来3天的天气预报"
"深圳和广州哪个城市更热?"
扩展功能
添加空气质量查询添加生活指数(穿衣、运动等)支持更多城市(通过城市搜索API)添加天气预警功能
4.2 项目2:网页抓取服务器 ⭐⭐
学习要点:
🌐 HTTP请求处理📄 HTML解析(BeautifulSoup)🔍 数据提取技巧🛡️ 网页抓取的安全性和道德规范
创建一个能抓取和分析网页内容的MCP服务器,让AI能够读取和理解网页。
应用场景
📰 让AI帮你总结新闻📊 抓取竞品网站的产品信息🔍 监控网页内容变化
项目结构
web-scraper-server/
├── server.py
└── requirements.txt
步骤1:创建项目
mkdir web-scraper-server
cd web-scraper-server
python3 -m venv venv
source venv/bin/activate
pip install mcp requests beautifulsoup4
创建 :
requirements.txt
mcp>=1.0.0
requests>=2.31.0
beautifulsoup4>=4.12.0
步骤2:编写服务器代码
创建 :
server.py
#!/usr/bin/env python3
"""
网页抓取MCP服务器
提供网页内容抓取和解析功能
"""
import asyncio
from typing import Any, Sequence
from urllib.parse import urlparse
import requests
from bs4 import BeautifulSoup
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# 创建MCP服务器
app = Server("web-scraper-server")
def fetch_webpage(url: str, timeout: int = 10) -> dict:
"""
抓取网页
Args:
url: 网页URL
timeout: 超时时间(秒)
Returns:
包含网页信息的字典
"""
try:
# 验证URL
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
return {"error": "无效的URL"}
# 发送请求
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
}
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status()
# 解析HTML
soup = BeautifulSoup(response.text, "html.parser")
# 提取信息
title = soup.title.string if soup.title else "无标题"
# 移除script和style标签
for script in soup(["script", "style"]):
script.decompose()
# 获取文本内容
text = soup.get_text(separator="
", strip=True)
# 获取所有链接
links = []
for link in soup.find_all("a", href=True):
links.append({
"text": link.get_text(strip=True),
"href": link["href"]
})
# 获取所有图片
images = []
for img in soup.find_all("img", src=True):
images.append({
"alt": img.get("alt", ""),
"src": img["src"]
})
return {
"url": url,
"title": title,
"text": text[:5000], # 限制长度
"links": links[:50], # 前50个链接
"images": images[:20], # 前20个图片
"status_code": response.status_code
}
except requests.Timeout:
return {"error": "请求超时"}
except requests.RequestException as e:
return {"error": f"请求失败: {str(e)}"}
except Exception as e:
return {"error": f"解析失败: {str(e)}"}
def format_webpage_info(data: dict) -> str:
"""格式化网页信息"""
if "error" in data:
return f"❌ {data['error']}"
lines = [
f"🌐 **{data['title']}**",
f"📍 URL: {data['url']}",
f"📊 状态: {data['status_code']}",
"",
"📄 **页面内容** (前5000字符):",
data['text'],
"",
f"🔗 **链接** ({len(data['links'])}个):"
]
for link in data['links'][:10]: # 只显示前10个
if link['text']:
lines.append(f" - {link['text']}: {link['href']}")
if len(data['links']) > 10:
lines.append(f" ... 还有 {len(data['links']) - 10} 个链接")
return "
".join(lines)
@app.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有可用工具"""
return [
Tool(
name="fetch_page",
description="抓取网页内容,返回标题、文本、链接和图片",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "网页URL(必须包含http://或https://)"
}
},
"required": ["url"]
}
),
Tool(
name="extract_links",
description="提取网页中的所有链接",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "网页URL"
}
},
"required": ["url"]
}
),
Tool(
name="extract_text",
description="提取网页的纯文本内容(去除HTML标签)",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "网页URL"
}
},
"required": ["url"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具"""
if name == "fetch_page":
url = arguments["url"]
print(f"🌐 抓取网页: {url}")
data = fetch_webpage(url)
result = format_webpage_info(data)
return [TextContent(type="text", text=result)]
elif name == "extract_links":
url = arguments["url"]
print(f"🔗 提取链接: {url}")
data = fetch_webpage(url)
if "error" in data:
return [TextContent(type="text", text=f"❌ {data['error']}")]
lines = [f"🔗 {data['title']} 的链接:
"]
for i, link in enumerate(data['links'], 1):
if link['text']:
lines.append(f"{i}. {link['text']}")
lines.append(f" {link['href']}
")
return [TextContent(type="text", text="
".join(lines))]
elif name == "extract_text":
url = arguments["url"]
print(f"📄 提取文本: {url}")
data = fetch_webpage(url)
if "error" in data:
return [TextContent(type="text", text=f"❌ {data['error']}")]
result = f"📄 {data['title']}
{data['text']}"
return [TextContent(type="text", text=result)]
else:
raise ValueError(f"未知的工具: {name}")
async def main():
"""启动服务器"""
print("=" * 50)
print("🌐 网页抓取MCP服务器")
print("=" * 50)
print("🚀 服务器启动中...")
print("=" * 50)
print("可用工具:")
print(" • fetch_page - 抓取网页完整信息")
print(" • extract_links - 提取所有链接")
print(" • extract_text - 提取纯文本")
print("=" * 50)
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
步骤3:测试服务器
npx @modelcontextprotocol/inspector python3 server.py
测试URL:
https://www.python.org"
在Cursor中配置:
{
"mcpServers": {
"web-scraper": {
"command": "python3",
"args": ["/完整路径/web-scraper-server/server.py"]
}
}
}
注意:替换为实际路径,如
/Users/yourname/web-scraper-server/server.py
在Cursor中测试:
"抓取Python官网的内容"
"提取https://news.ycombinator.com的所有链接"
"总结这个网页的主要内容:https://example.com"
安全注意事项
⚠️ 重要提示:
遵守robots.txt:尊重网站的爬虫规则请求频率限制:不要短时间内大量请求用户代理:设置合适的User-Agent法律合规:仅抓取公开信息,不侵犯版权
4.3 项目3:数据库查询服务器 ⭐⭐⭐
学习要点:
📊 SQL查询语法🗄️ SQLite数据库操作🔒 SQL注入防护📋 数据格式化输出
创建一个能操作SQLite数据库的MCP服务器,实现数据查询和分析。
应用场景
📊 让AI帮你分析数据库数据🔍 用自然语言查询数据📝 让AI生成SQL报表
项目结构
database-server/
├── server.py
├── init_db.py # 初始化数据库
├── sample.db # 示例数据库
└── requirements.txt
步骤1:创建项目
mkdir database-server
cd database-server
python3 -m venv venv
source venv/bin/activate
pip install mcp
创建 :
requirements.txt
mcp>=1.0.0
步骤2:初始化示例数据库
创建 :
init_db.py
#!/usr/bin/env python3
"""
初始化示例数据库
创建一个简单的图书管理系统数据库
"""
import sqlite3
from pathlib import Path
DB_PATH = Path(__file__).parent / "sample.db"
def init_database():
"""初始化数据库"""
# 删除旧数据库
if DB_PATH.exists():
DB_PATH.unlink()
# 创建连接
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 创建表
cursor.execute("""
CREATE TABLE books (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
author TEXT NOT NULL,
category TEXT NOT NULL,
price REAL NOT NULL,
stock INTEGER NOT NULL,
published_year INTEGER
)
""")
# 插入示例数据
books = [
("Python编程:从入门到实践", "埃里克·马瑟斯", "编程", 89.0, 50, 2023),
("深度学习", "伊恩·古德费洛", "AI", 168.0, 30, 2022),
("算法导论", "托马斯·科尔曼", "算法", 128.0, 25, 2021),
("设计模式", "Gang of Four", "编程", 99.0, 40, 2020),
("机器学习实战", "彼得·哈林顿", "AI", 79.0, 60, 2023),
("数据结构与算法", "马克·艾伦·韦斯", "算法", 89.0, 35, 2022),
("Flask Web开发", "Miguel Grinberg", "编程", 79.0, 45, 2023),
("自然语言处理", "Christopher Manning", "AI", 138.0, 20, 2021)
]
cursor.executemany(
"INSERT INTO books (title, author, category, price, stock, published_year) VALUES (?, ?, ?, ?, ?, ?)",
books
)
conn.commit()
print(f"✅ 数据库初始化完成: {DB_PATH}")
print(f"📚 插入了 {len(books)} 本书")
# 显示数据
cursor.execute("SELECT * FROM books")
for row in cursor.fetchall():
print(f" {row[0]}. {row[1]} - {row[2]} (¥{row[4]})")
conn.close()
if __name__ == "__main__":
init_database()
运行初始化:
python3 init_db.py
步骤3:编写服务器代码
创建 :
server.py
#!/usr/bin/env python3
"""
数据库查询MCP服务器
提供SQLite数据库的查询和操作功能
"""
import asyncio
import sqlite3
from pathlib import Path
from typing import Any, Sequence
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.server.lowlevel.helper_types import ReadResourceContents
from mcp.types import Tool, Resource, TextContent
from pydantic import AnyUrl
# 创建MCP服务器
app = Server("database-server")
# 数据库路径
DB_PATH = Path(__file__).parent / "sample.db"
def get_connection():
"""获取数据库连接"""
if not DB_PATH.exists():
raise FileNotFoundError(f"数据库不存在: {DB_PATH}。请先运行 init_db.py")
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row # 返回字典格式
return conn
def execute_query(sql: str) -> list[dict]:
"""
执行SQL查询
Args:
sql: SQL查询语句
Returns:
查询结果列表
"""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute(sql)
# 获取列名
columns = [description[0] for description in cursor.description]
# 获取数据
rows = []
for row in cursor.fetchall():
rows.append(dict(zip(columns, row)))
conn.close()
return rows
except Exception as e:
raise ValueError(f"SQL执行失败: {str(e)}")
def format_table(rows: list[dict]) -> str:
"""格式化表格"""
if not rows:
return "查询结果为空"
# 获取列名
columns = list(rows[0].keys())
# 计算列宽
widths = {col: len(col) for col in columns}
for row in rows:
for col in columns:
widths[col] = max(widths[col], len(str(row[col])))
# 生成表格
lines = []
# 表头
header = " | ".join(col.ljust(widths[col]) for col in columns)
lines.append(header)
lines.append("-" * len(header))
# 数据行
for row in rows:
line = " | ".join(str(row[col]).ljust(widths[col]) for col in columns)
lines.append(line)
return "
".join(lines)
@app.list_resources()
async def list_resources() -> list[Resource]:
"""列出数据库表"""
try:
conn = get_connection()
cursor = conn.cursor()
# 获取所有表名
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
tables = [row[0] for row in cursor.fetchall()]
conn.close()
# 创建资源列表
resources = []
for table in tables:
resources.append(
Resource(
uri=f"db://table/{table}",
name=f"表: {table}",
description=f"数据库表 {table} 的内容",
mimeType="text/plain"
)
)
return resources
except Exception as e:
print(f"❌ 获取表列表失败: {e}")
return []
@app.read_resource()
async def read_resource(uri: AnyUrl | str) -> Sequence[ReadResourceContents]:
"""读取表内容"""
uri_str = str(uri).strip()
if not uri_str.startswith("db://table/"):
raise ValueError(f"无效的URI: {uri_str}")
table_name = uri_str.replace("db://table/", "")
try:
# 查询表内容
sql = f"SELECT * FROM {table_name} LIMIT 100"
rows = execute_query(sql)
# 格式化结果
result = f"表 {table_name} 的内容(前100条):
"
result += format_table(rows)
result += f"
共 {len(rows)} 条记录"
return [ReadResourceContents(content=result, mime_type="text/plain")]
except Exception as e:
raise ValueError(f"读取表失败: {str(e)}")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有可用工具"""
return [
Tool(
name="query_database",
description="执行SQL查询(SELECT语句)。返回查询结果的表格。",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL查询语句(仅支持SELECT)"
}
},
"required": ["sql"]
}
),
Tool(
name="get_schema",
description="获取表的结构信息(列名、类型等)",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "表名"
}
},
"required": ["table_name"]
}
),
Tool(
name="search_books",
description="搜索图书(示例功能)",
inputSchema={
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "搜索关键词(在标题或作者中搜索)"
},
"category": {
"type": "string",
"description": "分类筛选(可选)",
"enum": ["编程", "AI", "算法"]
}
},
"required": ["keyword"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具"""
if name == "query_database":
sql = arguments["sql"].strip()
# 安全检查:只允许SELECT
if not sql.upper().startswith("SELECT"):
return [TextContent(
type="text",
text="❌ 错误:仅支持SELECT查询,不允许修改数据"
)]
print(f"📊 执行查询: {sql}")
try:
rows = execute_query(sql)
result = f"查询结果:
{format_table(rows)}
共 {len(rows)} 条记录"
return [TextContent(type="text", text=result)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 查询失败: {str(e)}"
)]
elif name == "get_schema":
table_name = arguments["table_name"]
print(f"📋 获取表结构: {table_name}")
try:
conn = get_connection()
cursor = conn.cursor()
# 获取表结构
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
if not columns:
return [TextContent(
type="text",
text=f"❌ 表不存在: {table_name}"
)]
# 格式化结果
lines = [f"表 {table_name} 的结构:
"]
for col in columns:
cid, name, type_, notnull, default, pk = col
nullable = "NOT NULL" if notnull else "NULL"
primary = " PRIMARY KEY" if pk else ""
lines.append(f" - {name}: {type_} {nullable}{primary}")
# 获取行数
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
count = cursor.fetchone()[0]
lines.append(f"
总行数: {count}")
conn.close()
return [TextContent(type="text", text="
".join(lines))]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 获取表结构失败: {str(e)}"
)]
elif name == "search_books":
keyword = arguments["keyword"]
category = arguments.get("category")
print(f"🔍 搜索图书: {keyword}")
# 构建SQL
sql = "SELECT * FROM books WHERE (title LIKE ? OR author LIKE ?)"
params = [f"%{keyword}%", f"%{keyword}%"]
if category:
sql += " AND category = ?"
params.append(category)
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute(sql, params)
rows = []
for row in cursor.fetchall():
rows.append(dict(row))
conn.close()
if not rows:
return [TextContent(
type="text",
text=f"未找到包含 '{keyword}' 的图书"
)]
result = f"搜索结果(关键词:{keyword}):
{format_table(rows)}
共找到 {len(rows)} 本书"
return [TextContent(type="text", text=result)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 搜索失败: {str(e)}"
)]
else:
raise ValueError(f"未知的工具: {name}")
async def main():
"""启动服务器"""
print("=" * 50)
print("🗄️ 数据库查询MCP服务器")
print("=" * 50)
print(f"📁 数据库路径: {DB_PATH}")
if not DB_PATH.exists():
print("⚠️ 数据库不存在,请先运行: python3 init_db.py")
return
print("🚀 服务器启动中...")
print("=" * 50)
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
步骤4:测试服务器
# 先初始化数据库
python3 init_db.py
# 测试服务器
npx @modelcontextprotocol/inspector python3 server.py
在Inspector中测试:
{
"keyword": "Python"
}
在Cursor中配置:
{
"mcpServers": {
"database": {
"command": "python3",
"args": ["/完整路径/database-server/server.py"]
}
}
}
注意:替换为实际路径,如
/Users/yourname/database-server/server.py
在Cursor中测试:
"列出数据库中所有的表"
"查询图书表的所有记录"
"搜索作者包含'马瑟斯'的图书"
"统计每个分类有多少本书"
"查找价格超过100元的图书"
4.4 项目4:Git操作服务器 ⭐⭐⭐⭐
学习要点:
🔧 GitPython库的使用📚 Git核心概念(commit、branch、diff)🌲 版本控制工作流📊 Git数据分析
创建一个能操作Git仓库的MCP服务器,让AI成为你的Git助手。
应用场景
📊 让AI分析代码变更🔍 查询commit历史📝 生成changelog
项目结构
git-server/
├── server.py
└── requirements.txt
步骤1:创建项目
mkdir git-server
cd git-server
python3 -m venv venv
source venv/bin/activate
pip install mcp GitPython
创建 :
requirements.txt
mcp>=1.0.0
GitPython>=3.1.40
步骤2:编写服务器代码
创建 :
server.py
#!/usr/bin/env python3
"""
Git操作MCP服务器
提供Git仓库的查询和分析功能
"""
import asyncio
from pathlib import Path
from typing import Any, Sequence
from datetime import datetime
from git import Repo, GitCommandError
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.server.lowlevel.helper_types import ReadResourceContents
from mcp.types import Tool, Resource, TextContent
from pydantic import AnyUrl
# 创建MCP服务器
app = Server("git-server")
# 默认仓库路径(可以通过环境变量配置)
DEFAULT_REPO_PATH = Path.cwd()
def get_repo(path: str = None) -> Repo:
"""获取Git仓库对象"""
repo_path = Path(path) if path else DEFAULT_REPO_PATH
if not repo_path.exists():
raise ValueError(f"路径不存在: {repo_path}")
try:
return Repo(repo_path)
except GitCommandError:
raise ValueError(f"不是Git仓库: {repo_path}")
@app.list_resources()
async def list_resources() -> list[Resource]:
"""列出Git相关资源"""
try:
repo = get_repo()
resources = [
Resource(
uri="git://status",
name="Git状态",
description="当前仓库的状态信息",
mimeType="text/plain"
),
Resource(
uri="git://branches",
name="分支列表",
description="所有分支信息",
mimeType="text/plain"
),
Resource(
uri="git://log",
name="提交历史",
description="最近的提交记录",
mimeType="text/plain"
)
]
return resources
except Exception as e:
print(f"❌ 获取资源失败: {e}")
return []
@app.read_resource()
async def read_resource(uri: AnyUrl | str) -> Sequence[ReadResourceContents]:
"""读取Git资源"""
uri_str = str(uri).strip()
try:
repo = get_repo()
if uri_str == "git://status":
# Git状态
status_lines = [
f"📁 仓库路径: {repo.working_dir}",
f"🌿 当前分支: {repo.active_branch.name}",
f"📊 状态:",
""
]
# 检查是否有未提交的变更
if repo.is_dirty():
status_lines.append("⚠️ 有未提交的变更")
# 显示修改的文件
changed_files = [item.a_path for item in repo.index.diff(None)]
if changed_files:
status_lines.append("
修改的文件:")
for file in changed_files[:10]:
status_lines.append(f" - {file}")
# 显示未跟踪的文件
untracked = repo.untracked_files
if untracked:
status_lines.append("
未跟踪的文件:")
for file in untracked[:10]:
status_lines.append(f" - {file}")
else:
status_lines.append("✅ 工作区干净")
text = "
".join(status_lines)
return [ReadResourceContents(content=text, mime_type="text/plain")]
elif uri_str == "git://branches":
# 分支列表
lines = ["🌿 分支列表:
"]
for branch in repo.branches:
is_current = branch == repo.active_branch
prefix = "→ " if is_current else " "
lines.append(f"{prefix}{branch.name}")
text = "
".join(lines)
return [ReadResourceContents(content=text, mime_type="text/plain")]
elif uri_str == "git://log":
# 提交历史
lines = ["📝 最近的提交 (前20条):
"]
for i, commit in enumerate(repo.iter_commits(max_count=20), 1):
date = datetime.fromtimestamp(commit.committed_date)
lines.append(f"{i}. {commit.hexsha[:7]} - {commit.summary}")
lines.append(f" 作者: {commit.author.name}")
lines.append(f" 时间: {date.strftime('%Y-%m-%d %H:%M:%S')}
")
text = "
".join(lines)
return [ReadResourceContents(content=text, mime_type="text/plain")]
else:
raise ValueError(f"未知的资源: {uri_str}")
except Exception as e:
raise ValueError(f"读取资源失败: {str(e)}")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有可用工具"""
return [
Tool(
name="git_log",
description="查看提交历史",
inputSchema={
"type": "object",
"properties": {
"count": {
"type": "integer",
"description": "显示的提交数量",
"default": 10
},
"author": {
"type": "string",
"description": "按作者筛选(可选)"
}
}
}
),
Tool(
name="git_diff",
description="查看文件变更",
inputSchema={
"type": "object",
"properties": {
"commit": {
"type": "string",
"description": "提交hash(可选,默认与HEAD比较)"
}
}
}
),
Tool(
name="git_show",
description="查看特定提交的详细信息",
inputSchema={
"type": "object",
"properties": {
"commit": {
"type": "string",
"description": "提交hash"
}
},
"required": ["commit"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具"""
try:
repo = get_repo()
if name == "git_log":
count = arguments.get("count", 10)
author = arguments.get("author")
print(f"📝 查看提交历史 (前{count}条)")
lines = [f"📝 最近的{count}条提交:
"]
for i, commit in enumerate(repo.iter_commits(max_count=count), 1):
# 如果指定了作者,进行筛选
if author and author not in commit.author.name:
continue
date = datetime.fromtimestamp(commit.committed_date)
lines.append(f"{i}. **{commit.hexsha[:7]}** - {commit.summary}")
lines.append(f" 👤 {commit.author.name} <{commit.author.email}>")
lines.append(f" 📅 {date.strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f" 📝 {commit.message.strip()}
")
return [TextContent(type="text", text="
".join(lines))]
elif name == "git_diff":
commit_hash = arguments.get("commit")
print(f"📊 查看变更: {commit_hash or 'HEAD'}")
if commit_hash:
# 查看特定提交的变更
commit = repo.commit(commit_hash)
diffs = commit.diff(commit.parents[0] if commit.parents else None)
else:
# 查看工作区的变更
diffs = repo.index.diff(None)
if not diffs:
return [TextContent(type="text", text="没有变更")]
lines = ["📊 文件变更:
"]
for diff in diffs:
change_type = diff.change_type
lines.append(f" {change_type}: {diff.a_path}")
return [TextContent(type="text", text="
".join(lines))]
elif name == "git_show":
commit_hash = arguments["commit"]
print(f"📖 查看提交: {commit_hash}")
commit = repo.commit(commit_hash)
date = datetime.fromtimestamp(commit.committed_date)
lines = [
f"📖 提交详情
",
f"**Hash**: {commit.hexsha}",
f"**作者**: {commit.author.name} <{commit.author.email}>",
f"**时间**: {date.strftime('%Y-%m-%d %H:%M:%S')}",
f"**提交信息**:
{commit.message}
",
f"**变更文件**:"
]
diffs = commit.diff(commit.parents[0] if commit.parents else None)
for diff in diffs:
lines.append(f" {diff.change_type}: {diff.a_path}")
return [TextContent(type="text", text="
".join(lines))]
else:
raise ValueError(f"未知的工具: {name}")
except Exception as e:
return [TextContent(type="text", text=f"❌ 执行失败: {str(e)}")]
async def main():
"""启动服务器"""
print("=" * 50)
print("🔧 Git操作MCP服务器")
print("=" * 50)
print(f"📁 默认仓库: {DEFAULT_REPO_PATH}")
try:
repo = get_repo()
print(f"✅ Git仓库已加载: {repo.working_dir}")
print(f"🌿 当前分支: {repo.active_branch.name}")
except Exception as e:
print(f"⚠️ 警告: {e}")
print("🚀 服务器启动中...")
print("=" * 50)
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
步骤3:测试服务器
npx @modelcontextprotocol/inspector python3 server.py
在Cursor中配置:
{
"mcpServers": {
"git": {
"command": "python3",
"args": ["/完整路径/git-server/server.py"]
}
}
}
注意:
替换为实际路径,如 服务器会使用当前工作目录的 Git 仓库或者通过环境变量指定仓库路径:
/Users/yourname/git-server/server.py
{
"mcpServers": {
"git": {
"command": "python3",
"args": ["/完整路径/git-server/server.py"],
"env": {
"GIT_REPO_PATH": "/path/to/your/git/repo"
}
}
}
}
在Cursor中测试:
"查看Git仓库的状态"
"显示最近10次提交"
"查看提交 abc123 的详细信息"
"列出所有分支"
4.5 小结:从练习到实战
在本章中,我们构建了4个实用的MCP服务器:
✅ 天气查询服务器
调用真实API获取数据环境变量管理API密钥错误处理和超时控制
✅ 数据库查询服务器
SQLite数据库操作安全的SQL查询(只读)表格格式化输出
✅ 网页抓取服务器
HTML解析和内容提取链接和图片提取网页内容总结
✅ Git操作服务器
Git仓库状态查询提交历史分析文件变更查看
关键学习点:
| 项目 | 核心技能 | 常用库 |
|---|---|---|
| 天气查询 | API调用、环境变量管理 | , |
| 数据库查询 | 数据库操作、SQL安全 | SQLite内置 |
| 网页抓取 | HTML解析、内容提取 | , |
| Git操作 | Git命令封装、版本控制 | |
实战经验:
错误处理:所有外部调用都要有try-except超时控制:网络请求要设置timeout安全验证:验证用户输入,防止注入攻击友好输出:格式化结果,便于AI理解
下一章预告:
在第5章中,我们将学习高级技巧:
错误处理和日志记录安全性和认证性能优化测试和调试部署和分发
这些技巧将让你的MCP服务器更加健壮和专业!
第5章 高级技巧:让MCP服务器更强大
在前面的章节中,我们已经学会了如何创建基本的MCP服务器。现在,让我们学习一些高级技巧,让你的服务器更加健壮、安全、高效和专业。
本章学习目标:
✅ 掌握错误处理和日志记录的最佳实践✅ 了解安全性和认证机制✅ 学会性能优化技巧✅ 掌握测试和调试方法✅ 了解部署和分发流程
5.1 错误处理和日志记录
良好的错误处理和日志记录是专业MCP服务器的基石。它们能帮助你:
🔍 快速定位问题📊 监控服务器运行状态🛡️ 提供更好的用户体验
学习目标:
✅ 创建带完整错误处理的MCP服务器✅ 实现结构化日志记录✅ 测试各种错误场景
项目结构
error-handling-server/
├── server.py # MCP服务器(带错误处理和日志)
├── requirements.txt # 依赖
└── mcp-server.log # 日志文件(运行后生成)
步骤1:创建项目
# 创建项目目录
mkdir error-handling-server
cd error-handling-server
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate # Mac/Linux
# venvScriptsactivate # Windows
# 安装依赖
pip install mcp
创建 :
requirements.txt
mcp>=1.0.0
步骤2:编写服务器代码
创建 :
server.py
#!/usr/bin/env python3
"""
错误处理和日志记录示例服务器
演示如何正确处理错误和记录日志
"""
import asyncio
import time
from typing import Any, Sequence
import logging
from pathlib import Path
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# 配置日志 - 同时输出到文件和控制台
log_file = Path(__file__).parent / "mcp-server.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
app = Server("error-handling-server")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有可用工具"""
return [
Tool(
name="divide",
description="执行除法运算,演示错误处理",
inputSchema={
"type": "object",
"properties": {
"a": {
"type": "number",
"description": "被除数"
},
"b": {
"type": "number",
"description": "除数"
}
},
"required": ["a", "b"]
}
),
Tool(
name="get_file_info",
description="获取文件信息,演示文件操作错误处理",
inputSchema={
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "文件路径"
}
},
"required": ["filepath"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具 - 带完善的错误处理和日志"""
start_time = time.time()
try:
logger.info(f"📥 收到工具调用: {name}, 参数: {arguments}")
if name == "divide":
result = await handle_divide(arguments)
duration = time.time() - start_time
logger.info(f"✅ 工具 {name} 执行成功,耗时: {duration:.3f}秒")
return [TextContent(type="text", text=result)]
elif name == "get_file_info":
result = await handle_file_info(arguments)
duration = time.time() - start_time
logger.info(f"✅ 工具 {name} 执行成功,耗时: {duration:.3f}秒")
return [TextContent(type="text", text=result)]
else:
raise ValueError(f"未知的工具: {name}")
except ValueError as e:
# 业务逻辑错误
duration = time.time() - start_time
logger.warning(f"⚠️ 工具 {name} 参数错误: {e}, 耗时: {duration:.3f}秒")
return [TextContent(
type="text",
text=f"❌ 参数错误: {str(e)}"
)]
except Exception as e:
# 未知错误
duration = time.time() - start_time
logger.error(f"❌ 工具 {name} 执行失败: {e}, 耗时: {duration:.3f}秒", exc_info=True)
return [TextContent(
type="text",
text="❌ 服务器内部错误,请稍后重试"
)]
async def handle_divide(arguments: dict) -> str:
"""处理除法运算 - 演示多层错误处理"""
a = arguments.get("a")
b = arguments.get("b")
# 1. 参数验证
if a is None or b is None:
logger.warning(f"参数缺失: a={a}, b={b}")
return "❌ 错误:参数a和b都是必需的"
# 2. 类型验证
try:
a = float(a)
b = float(b)
except (ValueError, TypeError) as e:
logger.error(f"类型转换失败: {e}")
return f"❌ 错误:参数必须是数字,收到 a={a}, b={b}"
# 3. 业务逻辑验证
if b == 0:
logger.warning(f"除零错误: {a} / {b}")
return "❌ 错误:除数不能为0"
# 4. 执行计算
result = a / b
logger.info(f"计算成功: {a} / {b} = {result}")
return f"✅ 结果: {a} ÷ {b} = {result}"
async def handle_file_info(arguments: dict) -> str:
"""处理文件信息获取 - 演示文件操作错误处理"""
filepath = arguments.get("filepath")
if not filepath:
return "❌ 错误:文件路径不能为空"
try:
path = Path(filepath)
if not path.exists():
logger.warning(f"文件不存在: {filepath}")
return f"❌ 错误:文件不存在: {filepath}"
if not path.is_file():
logger.warning(f"不是文件: {filepath}")
return f"❌ 错误:路径不是文件: {filepath}"
# 获取文件信息
stat = path.stat()
size = stat.st_size
modified = time.ctime(stat.st_mtime)
info = f"""📄 文件信息
路径: {filepath}
大小: {size} 字节
修改时间: {modified}"""
logger.info(f"成功获取文件信息: {filepath}")
return info
except PermissionError as e:
logger.error(f"权限错误: {filepath} - {e}")
return f"❌ 错误:没有权限访问文件: {filepath}"
except Exception as e:
logger.error(f"获取文件信息失败: {filepath} - {e}")
return f"❌ 错误:无法获取文件信息: {str(e)}"
async def main():
"""启动服务器"""
print("=" * 60)
print("🔧 错误处理和日志记录示例服务器")
print("=" * 60)
print(f"📝 日志文件: {log_file.absolute()}")
print("🚀 服务器启动中...")
print("=" * 60)
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
步骤3:测试服务器
使用Inspector测试:
npx @modelcontextprotocol/inspector python3 server.py
在Inspector中测试以下场景:
正常情况:
调用 ,参数:
divide调用
{"a": 10, "b": 2},参数:
get_file_info
{"filepath": "server.py"}
参数错误:
调用 ,参数:
divide(缺少b)调用
{"a": 10},参数:
divide(类型错误)
{"a": "abc", "b": 2}
业务逻辑错误:
调用 ,参数:
divide(除零错误)
{"a": 10, "b": 0}
文件操作错误:
调用 ,参数:
get_file_info
{"filepath": "不存在的文件.txt"}
查看日志文件:
# 查看日志
cat mcp-server.log
# 实时查看日志(新终端)
tail -f mcp-server.log
在Cursor中配置和测试
编辑Cursor的MCP配置():
~/.cursor/mcp.json
{
"mcpServers": {
"error-handling": {
"command": "/完整路径/error-handling-server/venv/bin/python",
"args": ["/完整路径/error-handling-server/server.py"]
}
}
}
注意:替换为实际路径,如
/Users/yourname/error-handling-server/venv/bin/python
在Cursor中测试:
重启Cursor后,在聊天中尝试:
"使用divide工具计算 100 除以 5"
"使用divide工具计算 10 除以 0"
"获取 server.py 文件的信息"
"获取不存在的文件的信息"
查看日志:
在服务器目录中查看 文件,你会看到详细的日志记录,包括:
mcp-server.log
每个工具调用的时间戳参数信息执行结果错误详情(如果有)
5.1.1 错误处理最佳实践
1. 使用具体的异常类型
import asyncio
from typing import Any, Sequence
import logging
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = Server("error-handling-server")
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具 - 带完善的错误处理"""
try:
if name == "divide":
a = arguments.get("a")
b = arguments.get("b")
# 参数验证
if a is None or b is None:
logger.warning(f"参数缺失: a={a}, b={b}")
return [TextContent(
type="text",
text="❌ 错误:参数a和b都是必需的"
)]
# 类型验证
try:
a = float(a)
b = float(b)
except (ValueError, TypeError) as e:
logger.error(f"类型转换失败: {e}")
return [TextContent(
type="text",
text=f"❌ 错误:参数必须是数字,收到 a={a}, b={b}"
)]
# 业务逻辑验证
if b == 0:
logger.warning(f"除零错误: {a} / {b}")
return [TextContent(
type="text",
text="❌ 错误:除数不能为0"
)]
# 执行计算
result = a / b
logger.info(f"计算成功: {a} / {b} = {result}")
return [TextContent(
type="text",
text=f"✅ 结果: {a} ÷ {b} = {result}"
)]
else:
raise ValueError(f"未知的工具: {name}")
except ValueError as e:
# 业务逻辑错误
logger.error(f"业务错误: {e}")
return [TextContent(
type="text",
text=f"❌ 错误: {str(e)}"
)]
except Exception as e:
# 未知错误
logger.exception(f"未预期的错误: {e}")
return [TextContent(
type="text",
text="❌ 服务器内部错误,请稍后重试"
)]
2. 错误分类和用户友好的消息
class MCPError(Exception):
"""MCP服务器基础错误类"""
pass
class ValidationError(MCPError):
"""参数验证错误"""
pass
class BusinessLogicError(MCPError):
"""业务逻辑错误"""
pass
class ExternalServiceError(MCPError):
"""外部服务错误"""
pass
def handle_error(error: Exception) -> str:
"""将异常转换为用户友好的错误消息"""
if isinstance(error, ValidationError):
return f"❌ 参数错误: {str(error)}"
elif isinstance(error, BusinessLogicError):
return f"❌ 操作失败: {str(error)}"
elif isinstance(error, ExternalServiceError):
return f"❌ 服务暂时不可用,请稍后重试"
else:
logger.exception("未处理的错误")
return "❌ 服务器错误,请稍后重试"
5.1.2 日志记录最佳实践
1. 使用结构化日志
import logging
import json
from datetime import datetime
class StructuredLogger:
"""结构化日志记录器"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def log_request(self, tool_name: str, arguments: dict):
"""记录请求"""
self.logger.info(json.dumps({
"event": "tool_request",
"tool": tool_name,
"arguments": arguments,
"timestamp": datetime.now().isoformat()
}))
def log_response(self, tool_name: str, success: bool, duration: float):
"""记录响应"""
self.logger.info(json.dumps({
"event": "tool_response",
"tool": tool_name,
"success": success,
"duration_ms": duration * 1000,
"timestamp": datetime.now().isoformat()
}))
def log_error(self, tool_name: str, error: Exception):
"""记录错误"""
self.logger.error(json.dumps({
"event": "tool_error",
"tool": tool_name,
"error_type": type(error).__name__,
"error_message": str(error),
"timestamp": datetime.now().isoformat()
}))
2. 日志级别使用
# DEBUG: 详细的调试信息(开发时使用)
logger.debug(f"处理参数: {arguments}")
# INFO: 一般信息(正常运行)
logger.info(f"工具 {name} 执行成功")
# WARNING: 警告信息(可能的问题)
logger.warning(f"参数 {param} 使用了默认值")
# ERROR: 错误信息(需要关注)
logger.error(f"工具 {name} 执行失败: {error}")
# CRITICAL: 严重错误(系统级问题)
logger.critical(f"服务器无法启动: {error}")
5.1.3 实际应用示例
import asyncio
import time
from typing import Any, Sequence
import logging
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('mcp-server.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
app = Server("robust-server")
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具 - 完整的错误处理和日志"""
start_time = time.time()
try:
logger.info(f"收到工具调用: {name}, 参数: {arguments}")
# 执行工具逻辑
result = await execute_tool_logic(name, arguments)
duration = time.time() - start_time
logger.info(f"工具 {name} 执行成功,耗时: {duration:.2f}秒")
return [TextContent(type="text", text=result)]
except ValueError as e:
duration = time.time() - start_time
logger.warning(f"工具 {name} 参数错误: {e}, 耗时: {duration:.2f}秒")
return [TextContent(
type="text",
text=f"❌ 参数错误: {str(e)}"
)]
except Exception as e:
duration = time.time() - start_time
logger.error(f"工具 {name} 执行失败: {e}, 耗时: {duration:.2f}秒", exc_info=True)
return [TextContent(
type="text",
text="❌ 服务器错误,请稍后重试"
)]
async def execute_tool_logic(name: str, arguments: dict) -> str:
"""工具逻辑(示例)"""
if name == "example":
# 你的业务逻辑
return "✅ 执行成功"
raise ValueError(f"未知的工具: {name}")
关键要点:
✅ 总是捕获具体的异常类型✅ 记录足够的上下文信息✅ 给用户友好的错误消息✅ 记录执行时间用于性能分析✅ 使用日志文件持久化记录
5.2 安全性和认证
安全性是MCP服务器的重要考虑因素,特别是当服务器处理敏感数据或执行危险操作时。
学习目标:
✅ 实现输入验证和参数校验✅ 防止SQL注入攻击✅ 安全地管理API密钥✅ 实现权限控制机制
项目结构
security-server/
├── server.py # MCP服务器(带安全验证)
├── requirements.txt # 依赖
├── .env # 环境变量(包含API密钥)
└── database.db # SQLite数据库(示例)
步骤1:创建项目
# 创建项目目录
mkdir security-server
cd security-server
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate # Mac/Linux
# venvScriptsactivate # Windows
# 安装依赖
pip install mcp python-dotenv
创建 :
requirements.txt
mcp>=1.0.0
python-dotenv>=1.0.0
步骤2:编写服务器代码
创建 :
server.py
#!/usr/bin/env python3
"""
安全性和认证示例服务器
演示输入验证、SQL注入防护、API密钥管理
"""
import asyncio
import os
import re
import sqlite3
import hmac
from typing import Any, Sequence
from pathlib import Path
from dotenv import load_dotenv
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# 加载环境变量
load_dotenv()
# 从环境变量读取API密钥
API_KEY = os.getenv("API_KEY", "demo-secret-key-12345")
app = Server("security-server")
# 初始化示例数据库
DB_PATH = Path(__file__).parent / "database.db"
def init_database():
"""初始化示例数据库"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 创建用户表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE
)
""")
# 插入示例数据
cursor.execute("""
INSERT OR IGNORE INTO users (name, email) VALUES
('张三', 'zhangsan@example.com'),
('李四', 'lisi@example.com'),
('王五', 'wangwu@example.com')
""")
conn.commit()
conn.close()
# 启动时初始化数据库
init_database()
@app.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有可用工具"""
return [
Tool(
name="validate_email",
description="验证邮箱格式,演示输入验证",
inputSchema={
"type": "object",
"properties": {
"email": {
"type": "string",
"description": "要验证的邮箱地址"
}
},
"required": ["email"]
}
),
Tool(
name="search_user",
description="搜索用户(使用参数化查询防止SQL注入)",
inputSchema={
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "搜索关键词(用户名)"
}
},
"required": ["keyword"]
}
),
Tool(
name="protected_operation",
description="需要API密钥的操作,演示密钥验证",
inputSchema={
"type": "object",
"properties": {
"api_key": {
"type": "string",
"description": "API密钥"
},
"action": {
"type": "string",
"description": "要执行的操作"
}
},
"required": ["api_key", "action"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具 - 带安全验证"""
try:
if name == "validate_email":
email = arguments.get("email", "")
if validate_email(email):
return [TextContent(
type="text",
text=f"✅ 邮箱格式有效: {email}"
)]
else:
return [TextContent(
type="text",
text=f"❌ 邮箱格式无效: {email}"
)]
elif name == "search_user":
keyword = arguments.get("keyword", "")
results = safe_search_users(keyword)
return [TextContent(
type="text",
text=f"找到 {len(results)} 个用户:
" + "
".join([f"- {r['name']} ({r['email']})" for r in results])
)]
elif name == "protected_operation":
api_key = arguments.get("api_key", "")
action = arguments.get("action", "")
if not verify_api_key(api_key):
return [TextContent(
type="text",
text="❌ 错误:无效的API密钥"
)]
return [TextContent(
type="text",
text=f"✅ 操作 '{action}' 执行成功(已通过API密钥验证)"
)]
else:
raise ValueError(f"未知的工具: {name}")
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 错误: {str(e)}"
)]
def validate_email(email: str) -> bool:
"""验证邮箱格式"""
if not email:
return False
# 邮箱格式正则表达式
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
def safe_search_users(keyword: str) -> list[dict]:
"""安全的用户搜索 - 使用参数化查询防止SQL注入"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# ✅ 正确:使用参数化查询
sql = "SELECT * FROM users WHERE name LIKE ?"
cursor.execute(sql, (f"%{keyword}%",))
# ❌ 错误示例(不要这样做):
# sql = f"SELECT * FROM users WHERE name LIKE '%{keyword}%'"
# cursor.execute(sql) # 容易被SQL注入攻击
rows = [dict(row) for row in cursor.fetchall()]
conn.close()
return rows
def verify_api_key(provided_key: str) -> bool:
"""安全地验证API密钥(使用时间安全的比较)"""
if not provided_key or not API_KEY:
return False
# 使用hmac.compare_digest防止时序攻击
return hmac.compare_digest(provided_key.encode(), API_KEY.encode())
async def main():
"""启动服务器"""
print("=" * 60)
print("🔒 安全性和认证示例服务器")
print("=" * 60)
print(f"📁 数据库: {DB_PATH.absolute()}")
print("🚀 服务器启动中...")
print("=" * 60)
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
创建 文件:
.env
# API密钥配置
API_KEY=demo-secret-key-12345
注意: 文件包含敏感信息,不要提交到版本控制系统!
.env
步骤3:测试服务器
使用Inspector测试:
npx @modelcontextprotocol/inspector python3 server.py
在Inspector中测试以下场景:
邮箱验证:
调用 ,参数:
validate_email ✅调用
{"email": "test@example.com"},参数:
validate_email ❌
{"email": "invalid-email"}
安全用户搜索:
调用 ,参数:
search_user ✅尝试SQL注入:
{"keyword": "张"} ✅(会被安全处理)
{"keyword": "' OR '1'='1"}
API密钥验证:
调用 ,参数:
protected_operation ✅调用
{"api_key": "demo-secret-key-12345", "action": "test"},参数:
protected_operation ❌
{"api_key": "wrong-key", "action": "test"}
在Cursor中配置和测试
编辑Cursor的MCP配置():
~/.cursor/mcp.json
{
"mcpServers": {
"security": {
"command": "/完整路径/security-server/venv/bin/python",
"args": ["/完整路径/security-server/server.py"]
}
}
}
在Cursor中测试:
重启Cursor后,在聊天中尝试:
"验证邮箱 test@example.com"
"搜索用户名为'张'的用户"
"使用API密钥 demo-secret-key-12345 执行操作 test"
5.2.1 输入验证
1. 参数验证
from typing import Any
import re
def validate_email(email: str) -> bool:
"""验证邮箱格式"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
def validate_url(url: str) -> bool:
"""验证URL格式"""
pattern = r'^https?://[^s/$.?#].[^s]*$'
return bool(re.match(pattern, url))
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具 - 带输入验证"""
if name == "send_email":
email = arguments.get("email")
message = arguments.get("message")
# 验证邮箱
if not email or not validate_email(email):
return [TextContent(
type="text",
text="❌ 错误:无效的邮箱地址"
)]
# 验证消息长度
if not message or len(message) > 10000:
return [TextContent(
type="text",
text="❌ 错误:消息不能为空且不能超过10000字符"
)]
# 执行发送逻辑
# ...
2. SQL注入防护
import sqlite3
def safe_query(sql: str, params: tuple = None) -> list[dict]:
"""安全的SQL查询 - 使用参数化查询"""
conn = sqlite3.connect("database.db")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# ✅ 正确:使用参数化查询
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
# ❌ 错误:直接拼接SQL(容易被注入)
# cursor.execute(f"SELECT * FROM users WHERE name = '{name}'")
rows = [dict(row) for row in cursor.fetchall()]
conn.close()
return rows
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具 - SQL安全"""
if name == "search_user":
username = arguments.get("username", "")
# ✅ 使用参数化查询
sql = "SELECT * FROM users WHERE name LIKE ?"
results = safe_query(sql, (f"%{username}%",))
return [TextContent(
type="text",
text=f"找到 {len(results)} 个用户"
)]
5.2.2 API密钥管理
1. 环境变量管理
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 从环境变量读取密钥
API_KEY = os.getenv("API_KEY")
if not API_KEY:
raise ValueError("API_KEY环境变量未设置")
# 使用密钥
def call_external_api(data):
headers = {"Authorization": f"Bearer {API_KEY}"}
# 调用API...
2. 密钥验证
import hashlib
import hmac
def verify_api_key(provided_key: str, expected_key: str) -> bool:
"""安全地比较API密钥"""
return hmac.compare_digest(provided_key, expected_key)
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具 - 带API密钥验证"""
api_key = arguments.get("api_key")
if not verify_api_key(api_key, os.getenv("API_KEY")):
return [TextContent(
type="text",
text="❌ 错误:无效的API密钥"
)]
# 执行工具逻辑...
5.2.3 权限控制
from enum import Enum
class Permission(Enum):
READ = "read"
WRITE = "write"
ADMIN = "admin"
def check_permission(user: str, required_permission: Permission) -> bool:
"""检查用户权限"""
user_permissions = get_user_permissions(user)
return required_permission.value in user_permissions
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具 - 带权限检查"""
user = arguments.get("user")
if name == "delete_data":
if not check_permission(user, Permission.ADMIN):
return [TextContent(
type="text",
text="❌ 错误:权限不足,需要管理员权限"
)]
# 执行删除操作...
安全最佳实践:
✅ 永远不要信任用户输入✅ 使用参数化查询防止SQL注入✅ 将敏感信息存储在环境变量中✅ 使用HTTPS传输敏感数据✅ 实现适当的权限控制✅ 记录所有安全相关事件
5.3 性能优化
性能优化能让你的MCP服务器更快、更高效地响应请求。
学习目标:
✅ 实现缓存机制减少重复计算✅ 使用异步操作提高并发性能✅ 优化数据库连接管理
项目结构
performance-server/
├── server.py # MCP服务器(带性能优化)
├── requirements.txt # 依赖
└── cache_data/ # 缓存目录(自动创建)
步骤1:创建项目
# 创建项目目录
mkdir performance-server
cd performance-server
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate # Mac/Linux
# venvScriptsactivate # Windows
# 安装依赖
pip install mcp aiohttp
创建 :
requirements.txt
mcp>=1.0.0
aiohttp>=3.9.0
步骤2:编写服务器代码
创建 :
server.py
#!/usr/bin/env python3
"""
性能优化示例服务器
演示缓存、异步操作、连接池等优化技巧
"""
import asyncio
import time
from typing import Any, Sequence
from functools import lru_cache
from pathlib import Path
import aiohttp
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
app = Server("performance-server")
# 缓存字典(手动实现TTL缓存)
cache = {}
cache_ttl = {}
def get_cached_data(key: str, ttl: int = 300) -> str | None:
"""获取缓存数据(带TTL)"""
now = time.time()
# 检查缓存是否存在且未过期
if key in cache and now < cache_ttl.get(key, 0):
return cache[key]
# 缓存不存在或已过期
if key in cache:
del cache[key]
del cache_ttl[key]
return None
def set_cached_data(key: str, value: str, ttl: int = 300):
"""设置缓存数据(带TTL)"""
cache[key] = value
cache_ttl[key] = time.time() + ttl
@lru_cache(maxsize=128)
def expensive_calculation(n: int) -> int:
"""昂贵的计算(使用LRU缓存)"""
time.sleep(0.1) # 模拟耗时操作
return n * n
@app.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有可用工具"""
return [
Tool(
name="cached_calculation",
description="带缓存的平方计算(演示LRU缓存)",
inputSchema={
"type": "object",
"properties": {
"number": {
"type": "integer",
"description": "要计算的数字"
}
},
"required": ["number"]
}
),
Tool(
name="fetch_url",
description="获取URL内容(带TTL缓存)",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "要获取的URL"
}
},
"required": ["url"]
}
),
Tool(
name="fetch_multiple_urls",
description="并发获取多个URL(演示异步优化)",
inputSchema={
"type": "object",
"properties": {
"urls": {
"type": "array",
"items": {"type": "string"},
"description": "URL列表"
}
},
"required": ["urls"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具"""
start_time = time.time()
try:
if name == "cached_calculation":
number = arguments.get("number")
if number is None:
return [TextContent(type="text", text="❌ 错误:缺少参数 number")]
# 第一次调用会慢,后续调用会从缓存获取
result = expensive_calculation(number)
duration = time.time() - start_time
return [TextContent(
type="text",
text=f"✅ {number}² = {result}
⏱️ 耗时: {duration:.3f}秒"
)]
elif name == "fetch_url":
url = arguments.get("url", "")
if not url:
return [TextContent(type="text", text="❌ 错误:缺少参数 url")]
# 检查缓存
cached = get_cached_data(url, ttl=60) # 缓存60秒
if cached:
duration = time.time() - start_time
return [TextContent(
type="text",
text=f"✅ 从缓存获取(缓存60秒)
📄 内容长度: {len(cached)} 字符
⏱️ 耗时: {duration:.3f}秒"
)]
# 获取URL内容
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
content = await response.text()
set_cached_data(url, content, ttl=60)
duration = time.time() - start_time
return [TextContent(
type="text",
text=f"✅ 获取成功
📄 内容长度: {len(content)} 字符
⏱️ 耗时: {duration:.3f}秒"
)]
elif name == "fetch_multiple_urls":
urls = arguments.get("urls", [])
if not urls:
return [TextContent(type="text", text="❌ 错误:缺少参数 urls")]
# 并发获取所有URL
results = await fetch_multiple_urls(urls)
duration = time.time() - start_time
success_count = sum(1 for r in results if r["success"])
total_size = sum(r.get("size", 0) for r in results)
return [TextContent(
type="text",
text=f"✅ 并发获取完成
"
f"📊 成功: {success_count}/{len(urls)}
"
f"📄 总大小: {total_size} 字符
"
f"⏱️ 总耗时: {duration:.3f}秒
"
f"⚡ 平均每个: {duration/len(urls):.3f}秒"
)]
else:
raise ValueError(f"未知的工具: {name}")
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 错误: {str(e)}"
)]
async def fetch_multiple_urls(urls: list[str]) -> list[dict]:
"""并发获取多个URL"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_single_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
processed = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed.append({
"url": urls[i],
"success": False,
"error": str(result)
})
else:
processed.append(result)
return processed
async def fetch_single_url(session: aiohttp.ClientSession, url: str) -> dict:
"""获取单个URL"""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
content = await response.text()
return {
"url": url,
"success": True,
"size": len(content)
}
except Exception as e:
return {
"url": url,
"success": False,
"error": str(e)
}
async def main():
"""启动服务器"""
print("=" * 60)
print("⚡ 性能优化示例服务器")
print("=" * 60)
print("🚀 服务器启动中...")
print("=" * 60)
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
步骤3:测试服务器
使用Inspector测试:
npx @modelcontextprotocol/inspector python3 server.py
在Inspector中测试以下场景:
LRU缓存测试:
第一次调用 ,参数:
cached_calculation(会慢)再次调用相同参数(会快,从缓存获取)
{"number": 10}
TTL缓存测试:
调用 ,参数:
fetch_url(第一次慢)立即再次调用(会快,从缓存获取)
{"url": "https://www.python.org"}
异步并发测试:
调用 ,参数:
fetch_multiple_urls观察并发执行的速度优势
{"urls": ["https://www.python.org", "https://www.github.com", "https://www.stackoverflow.com"]}
在Cursor中配置和测试
编辑Cursor的MCP配置():
~/.cursor/mcp.json
{
"mcpServers": {
"performance": {
"command": "/完整路径/performance-server/venv/bin/python",
"args": ["/完整路径/performance-server/server.py"]
}
}
}
在Cursor中测试:
重启Cursor后,在聊天中尝试:
"计算 100 的平方"
"再次计算 100 的平方"(应该更快)
"获取 https://www.python.org 的内容"
"并发获取 https://www.python.org 和 https://www.github.com 的内容"
5.3.1 缓存机制
from functools import lru_cache
import time
# 使用LRU缓存
@lru_cache(maxsize=128)
def expensive_operation(key: str) -> str:
"""昂贵的操作(会被缓存)"""
time.sleep(1) # 模拟耗时操作
return f"结果: {key}"
# 手动缓存实现
cache = {}
cache_ttl = {} # 缓存过期时间
def get_cached_data(key: str, ttl: int = 300) -> str:
"""获取缓存数据"""
now = time.time()
# 检查缓存是否存在且未过期
if key in cache and now < cache_ttl.get(key, 0):
return cache[key]
# 缓存不存在或已过期,重新计算
data = expensive_operation(key)
cache[key] = data
cache_ttl[key] = now + ttl
return data
5.3.2 异步操作
import asyncio
import aiohttp
async def fetch_multiple_urls(urls: list[str]) -> list[dict]:
"""并发获取多个URL"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""获取单个URL"""
async with session.get(url) as response:
return await response.json()
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具 - 使用异步优化"""
if name == "fetch_data":
urls = arguments.get("urls", [])
# 并发获取所有URL
results = await fetch_multiple_urls(urls)
return [TextContent(
type="text",
text=f"获取了 {len(results)} 个URL的数据"
)]
5.3.3 数据库连接池
import sqlite3
from contextlib import contextmanager
# 连接池(简化版)
class ConnectionPool:
def __init__(self, db_path: str, max_connections: int = 5):
self.db_path = db_path
self.pool = []
self.max_connections = max_connections
@contextmanager
def get_connection(self):
"""获取数据库连接"""
if self.pool:
conn = self.pool.pop()
else:
conn = sqlite3.connect(self.db_path)
try:
yield conn
finally:
if len(self.pool) < self.max_connections:
self.pool.append(conn)
else:
conn.close()
# 使用连接池
pool = ConnectionPool("database.db")
def query_with_pool(sql: str):
"""使用连接池查询"""
with pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql)
return cursor.fetchall()
性能优化要点:
✅ 使用缓存减少重复计算✅ 使用异步操作提高并发性能✅ 使用连接池管理数据库连接✅ 避免不必要的数据库查询✅ 优化数据结构选择
5.4 测试和调试
测试和调试是确保MCP服务器质量的关键。
学习目标:
✅ 编写单元测试覆盖核心功能✅ 编写集成测试验证端到端流程✅ 使用Mock对象隔离外部依赖✅ 掌握调试技巧
项目结构
testing-server/
├── server.py # MCP服务器
├── test_server.py # 测试文件
├── requirements.txt # 依赖
└── requirements-dev.txt # 开发依赖
步骤1:创建项目
# 创建项目目录
mkdir testing-server
cd testing-server
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate # Mac/Linux
# venvScriptsactivate # Windows
# 安装依赖
pip install mcp
pip install pytest pytest-asyncio # 测试框架
创建 :
requirements.txt
mcp>=1.0.0
创建 :
requirements-dev.txt
pytest>=7.4.0
pytest-asyncio>=0.21.0
步骤2:编写服务器代码
创建 :
server.py
#!/usr/bin/env python3
"""
测试示例服务器
演示如何编写可测试的MCP服务器
"""
import asyncio
from typing import Any, Sequence
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
app = Server("testing-server")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有可用工具"""
return [
Tool(
name="add",
description="加法运算",
inputSchema={
"type": "object",
"properties": {
"a": {"type": "number", "description": "第一个数字"},
"b": {"type": "number", "description": "第二个数字"}
},
"required": ["a", "b"]
}
),
Tool(
name="get_weather",
description="获取天气(需要外部API)",
inputSchema={
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名称"}
},
"required": ["city"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具"""
if name == "add":
a = arguments.get("a", 0)
b = arguments.get("b", 0)
result = add_numbers(a, b)
return [TextContent(type="text", text=f"✅ 结果: {a} + {b} = {result}")]
elif name == "get_weather":
city = arguments.get("city", "")
weather = await fetch_weather(city)
return [TextContent(type="text", text=weather)]
else:
raise ValueError(f"未知的工具: {name}")
def add_numbers(a: float, b: float) -> float:
"""加法函数(纯函数,易于测试)"""
return a + b
async def fetch_weather(city: str) -> str:
"""获取天气(模拟外部API调用)"""
# 在实际应用中,这里会调用真实的天气API
# 为了测试,我们使用模拟数据
await asyncio.sleep(0.1) # 模拟网络延迟
return f"🌤️ {city}的天气:晴天,25°C"
async def main():
"""启动服务器"""
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
步骤3:编写测试代码
创建 :
test_server.py
#!/usr/bin/env python3
"""
MCP服务器测试
演示单元测试和集成测试
"""
import pytest
from unittest.mock import Mock, patch, AsyncMock
import asyncio
# 导入服务器代码
from server import add_numbers, fetch_weather, app
class TestAddNumbers:
"""测试加法函数(单元测试)"""
def test_add_positive_numbers(self):
"""测试正数相加"""
assert add_numbers(2, 3) == 5
assert add_numbers(10, 20) == 30
def test_add_negative_numbers(self):
"""测试负数相加"""
assert add_numbers(-2, -3) == -5
assert add_numbers(-10, 5) == -5
def test_add_zero(self):
"""测试零相加"""
assert add_numbers(0, 0) == 0
assert add_numbers(5, 0) == 5
assert add_numbers(0, 5) == 5
def test_add_decimals(self):
"""测试小数相加"""
assert add_numbers(1.5, 2.5) == 4.0
assert add_numbers(0.1, 0.2) == pytest.approx(0.3)
@pytest.mark.asyncio
class TestFetchWeather:
"""测试天气获取函数(异步测试)"""
async def test_fetch_weather_success(self):
"""测试成功获取天气"""
result = await fetch_weather("北京")
assert "北京" in result
assert "天气" in result
async def test_fetch_weather_empty_city(self):
"""测试空城市名"""
result = await fetch_weather("")
assert "天气" in result
@pytest.mark.asyncio
class TestMCPTools:
"""测试MCP工具(集成测试)"""
async def test_list_tools(self):
"""测试列出工具"""
tools = await app.list_tools()
assert len(tools) > 0
tool_names = [tool.name for tool in tools]
assert "add" in tool_names
assert "get_weather" in tool_names
async def test_call_tool_add(self):
"""测试调用add工具"""
result = await app.call_tool("add", {"a": 5, "b": 3})
assert len(result) > 0
assert "8" in result[0].text
async def test_call_tool_add_missing_param(self):
"""测试add工具缺少参数"""
with pytest.raises(KeyError):
await app.call_tool("add", {"a": 5})
async def test_call_tool_unknown(self):
"""测试未知工具"""
with pytest.raises(ValueError):
await app.call_tool("unknown_tool", {})
@pytest.mark.asyncio
class TestWithMocks:
"""使用Mock对象测试(隔离外部依赖)"""
@patch('server.fetch_weather')
async def test_get_weather_with_mock(self, mock_fetch_weather):
"""使用Mock测试天气获取"""
# 设置Mock返回值
mock_fetch_weather.return_value = "🌧️ 测试城市:雨天,20°C"
# 调用工具
result = await app.call_tool("get_weather", {"city": "测试城市"})
# 验证结果
assert len(result) > 0
assert "测试城市" in result[0].text
# 验证Mock被调用
mock_fetch_weather.assert_called_once_with("测试城市")
if __name__ == "__main__":
# 运行测试
pytest.main([__file__, "-v"])
步骤4:运行测试
运行所有测试:
# 使用pytest运行测试
pytest test_server.py -v
# 或者使用Python直接运行
python test_server.py
运行特定测试:
# 只运行单元测试
pytest test_server.py::TestAddNumbers -v
# 只运行异步测试
pytest test_server.py::TestFetchWeather -v
# 运行带Mock的测试
pytest test_server.py::TestWithMocks -v
查看测试覆盖率:
# 安装覆盖率工具
pip install pytest-cov
# 运行测试并生成覆盖率报告
pytest test_server.py --cov=server --cov-report=html
# 查看HTML报告
open htmlcov/index.html # Mac
# start htmlcov/index.html # Windows
步骤5:使用Inspector调试
启动Inspector:
npx @modelcontextprotocol/inspector python3 server.py
在Inspector中可以:
查看所有可用工具测试工具调用查看请求和响应调试错误
在Cursor中配置和测试
编辑Cursor的MCP配置():
~/.cursor/mcp.json
{
"mcpServers": {
"testing": {
"command": "/完整路径/testing-server/venv/bin/python",
"args": ["/完整路径/testing-server/server.py"]
}
}
}
在Cursor中测试:
重启Cursor后,在聊天中尝试:
"计算 10 加 20"
"获取北京的天气"
5.4.1 单元测试
import unittest
from unittest.mock import Mock, patch
class TestMCPServer(unittest.TestCase):
"""MCP服务器单元测试"""
def setUp(self):
"""测试前准备"""
self.app = Server("test-server")
def test_tool_execution(self):
"""测试工具执行"""
# 模拟工具调用
result = await self.app.call_tool("test_tool", {"arg": "value"})
self.assertIsNotNone(result)
@patch('requests.get')
def test_external_api_call(self, mock_get):
"""测试外部API调用"""
# 模拟API响应
mock_get.return_value.json.return_value = {"status": "ok"}
# 测试代码
result = call_external_api()
self.assertEqual(result["status"], "ok")
if __name__ == "__main__":
unittest.main()
5.4.2 集成测试
import asyncio
from mcp.server.stdio import stdio_server
async def test_server_integration():
"""集成测试"""
app = Server("test-server")
# 测试工具列表
tools = await app.list_tools()
assert len(tools) > 0
# 测试工具调用
result = await app.call_tool("test_tool", {})
assert result is not None
# 运行测试
asyncio.run(test_server_integration())
5.4.3 调试技巧
import logging
import pdb
# 1. 使用日志调试
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def debug_function():
logger.debug("进入函数")
logger.debug(f"参数: {args}")
# 你的代码
logger.debug("函数执行完成")
# 2. 使用断点调试
def debug_with_breakpoint():
pdb.set_trace() # 设置断点
# 你的代码
# 3. 使用MCP Inspector调试
# 运行: npx @modelcontextprotocol/inspector python3 server.py
测试最佳实践:
✅ 编写单元测试覆盖核心功能✅ 编写集成测试验证端到端流程✅ 使用Mock对象隔离外部依赖✅ 测试错误情况和边界条件✅ 保持测试代码简洁和可维护
5.5 部署和分发
部署和分发让你的MCP服务器可以被其他人使用。
学习目标:
✅ 使用setup.py打包项目✅ 编写清晰的README文档✅ 管理版本和配置✅ 发布到PyPI(可选)
项目结构
my-mcp-server/
├── my_mcp_server/ # 包目录
│ ├── __init__.py
│ ├── server.py # 服务器代码
│ └── version.py # 版本信息
├── tests/ # 测试目录
│ └── test_server.py
├── setup.py # 打包配置
├── README.md # 项目文档
├── requirements.txt # 依赖
└── .gitignore # Git忽略文件
步骤1:创建项目结构
# 创建项目目录
mkdir my-mcp-server
cd my-mcp-server
# 创建包目录
mkdir my_mcp_server
mkdir tests
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate # Mac/Linux
# venvScriptsactivate # Windows
步骤2:编写服务器代码
创建 :
my_mcp_server/__init__.py
"""My MCP Server - 一个示例MCP服务器"""
from .version import __version__
__all__ = ["__version__"]
创建 :
my_mcp_server/version.py
"""版本信息"""
__version__ = "1.0.0"
创建 :
my_mcp_server/server.py
#!/usr/bin/env python3
"""
My MCP Server
一个示例MCP服务器,演示如何打包和分发
"""
import asyncio
from typing import Any, Sequence
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from .version import __version__
app = Server("my-mcp-server")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有可用工具"""
return [
Tool(
name="greet",
description="打招呼工具",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "要打招呼的名字"
}
},
"required": ["name"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> Sequence[TextContent]:
"""执行工具"""
if name == "greet":
name = arguments.get("name", "朋友")
return [TextContent(
type="text",
text=f"👋 你好,{name}!欢迎使用 My MCP Server v{__version__}"
)]
else:
raise ValueError(f"未知的工具: {name}")
async def main():
"""启动服务器(作为命令行入口点)"""
print(f"My MCP Server v{__version__}")
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
步骤3:创建setup.py
创建 :
setup.py
#!/usr/bin/env python3
"""
setup.py - 打包配置
"""
from setuptools import setup, find_packages
from pathlib import Path
# 读取README
readme_file = Path(__file__).parent / "README.md"
long_description = readme_file.read_text(encoding="utf-8") if readme_file.exists() else ""
# 读取版本
version_file = Path(__file__).parent / "my_mcp_server" / "version.py"
version = {}
exec(version_file.read_text(encoding="utf-8"), version)
setup(
name="my-mcp-server",
version=version["__version__"],
description="一个示例MCP服务器",
long_description=long_description,
long_description_content_type="text/markdown",
author="Your Name",
author_email="your.email@example.com",
url="https://github.com/yourusername/my-mcp-server",
packages=find_packages(),
install_requires=[
"mcp>=1.0.0",
],
python_requires=">=3.8",
entry_points={
"console_scripts": [
"my-mcp-server=my_mcp_server.server:main",
],
},
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
)
步骤4:创建README.md
创建 ,内容如下:
README.md
# My MCP Server
一个示例MCP服务器,演示如何打包和分发MCP服务器。
## 功能
- ✅ 打招呼工具(greet)
## 安装
### 从源码安装
# 克隆仓库
git clone https://github.com/yourusername/my-mcp-server.git
cd my-mcp-server
# 安装
pip install -e .
### 从PyPI安装(如果已发布)
pip install my-mcp-server
## 配置
在Cursor的MCP配置中添加(`~/.cursor/mcp.json`):
{
"mcpServers": {
"my-server": {
"command": "my-mcp-server"
}
}
}
**注意**:如果从源码安装,需要使用完整路径:
{
"mcpServers": {
"my-server": {
"command": "/完整路径/my-mcp-server/venv/bin/python",
"args": ["-m", "my_mcp_server.server"]
}
}
}
## 使用
重启Cursor后,在聊天中使用:
"使用greet工具向张三打招呼"
## 开发
# 安装开发依赖
pip install -e ".[dev]"
# 运行测试
pytest
# 运行服务器
python -m my_mcp_server.server
## 许可证
MIT License
步骤5:创建其他配置文件
创建 :
requirements.txt
mcp>=1.0.0
创建 :
.gitignore
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
venv/
env/
ENV/
# 打包
build/
dist/
*.egg-info/
# IDE
.vscode/
.idea/
*.swp
*.swo
# 测试
.pytest_cache/
.coverage
htmlcov/
步骤6:打包项目
构建分发包:
# 安装构建工具
pip install build
# 构建分发包
python -m build
# 这会生成:
# - dist/my_mcp_server-1.0.0.tar.gz (源码包)
# - dist/my_mcp_server-1.0.0-py3-none-any.whl (wheel包)
本地测试安装:
# 安装到虚拟环境
pip install dist/my_mcp_server-1.0.0-py3-none-any.whl
# 测试命令是否可用
my-mcp-server --help
步骤7:发布到PyPI(可选)
准备发布:
# 安装发布工具
pip install twine
# 检查分发包
twine check dist/*
发布到PyPI:
# 发布到测试PyPI(先测试)
twine upload --repository testpypi dist/*
# 发布到正式PyPI
twine upload dist/*
注意:需要PyPI账号和API token。
步骤8:版本管理
更新版本时,修改 :
my_mcp_server/version.py
__version__ = "1.0.1" # 从 1.0.0 升级到 1.0.1
然后重新打包:
python -m build
在Cursor中配置和测试
如果已安装到系统:
{
"mcpServers": {
"my-server": {
"command": "my-mcp-server"
}
}
}
如果从源码运行:
{
"mcpServers": {
"my-server": {
"command": "/完整路径/my-mcp-server/venv/bin/python",
"args": ["-m", "my_mcp_server.server"]
}
}
}
在Cursor中测试:
重启Cursor后,在聊天中尝试:
"使用greet工具向世界打招呼"
5.5.1 打包发布
1. 创建setup.py
from setuptools import setup, find_packages
setup(
name="my-mcp-server",
version="1.0.0",
description="我的MCP服务器",
author="Your Name",
packages=find_packages(),
install_requires=[
"mcp>=1.0.0",
"requests>=2.31.0",
],
entry_points={
"console_scripts": [
"my-mcp-server=my_mcp_server.server:main",
],
},
)
2. 创建README.md
# My MCP Server
一个功能强大的MCP服务器。
## 安装
pip install my-mcp-server
## 配置
在Cursor的MCP配置中添加:
{
"mcpServers": {
"my-server": {
"command": "my-mcp-server"
}
}
}
## 使用
重启Cursor后,在聊天中使用:
"使用我的工具做某事"
5.5.2 版本管理
# version.py
__version__ = "1.0.0"
# server.py
from .version import __version__
@app.list_tools()
async def list_tools():
logger.info(f"服务器版本: {__version__}")
# ...
5.5.3 配置管理
import json
from pathlib import Path
class Config:
"""配置管理"""
def __init__(self, config_path: Path = None):
self.config_path = config_path or Path.home() / ".mcp" / "config.json"
self.config = self.load_config()
def load_config(self) -> dict:
"""加载配置"""
if self.config_path.exists():
with open(self.config_path) as f:
return json.load(f)
return {}
def get(self, key: str, default=None):
"""获取配置值"""
return self.config.get(key, default)
def set(self, key: str, value):
"""设置配置值"""
self.config[key] = value
self.save_config()
def save_config(self):
"""保存配置"""
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.config_path, 'w') as f:
json.dump(self.config, f, indent=2)
# 使用配置
config = Config()
api_key = config.get("api_key")
部署要点:
✅ 使用setup.py打包项目✅ 编写清晰的README文档✅ 使用版本号管理发布✅ 提供配置示例✅ 考虑向后兼容性
5.6 小结:最佳实践
在本章中,我们学习了MCP服务器的高级技巧:
✅ 错误处理和日志记录
使用具体的异常类型记录结构化日志提供用户友好的错误消息
✅ 安全性和认证
验证所有用户输入使用参数化查询防止SQL注入安全地管理API密钥实现权限控制
✅ 性能优化
使用缓存减少重复计算使用异步操作提高并发使用连接池管理资源
✅ 测试和调试
编写单元测试和集成测试使用Mock对象隔离依赖使用日志和断点调试
✅ 部署和分发
使用setup.py打包项目编写清晰的文档管理版本和配置
完整的最佳实践清单:
| 类别 | 实践 | 重要性 |
|---|---|---|
| 错误处理 | 捕获具体异常、记录日志、友好消息 | ⭐⭐⭐⭐⭐ |
| 安全性 | 输入验证、SQL注入防护、密钥管理 | ⭐⭐⭐⭐⭐ |
| 性能 | 缓存、异步、连接池 | ⭐⭐⭐⭐ |
| 测试 | 单元测试、集成测试 | ⭐⭐⭐⭐ |
| 部署 | 打包、文档、版本管理 | ⭐⭐⭐ |
下一步:
现在你已经掌握了MCP的核心技能和高级技巧!你可以:
🚀 创建自己的MCP服务器📚 阅读MCP官方文档了解更多细节🤝 参与MCP社区,分享你的项目🔧 持续优化和改进你的服务器
恭喜你完成了MCP的学习之旅! 🎉
现在,你已经具备了创建专业级MCP服务器的所有技能。去构建一些令人惊叹的东西吧!


