核心架构
了解如何应用程序如何通过nxn协议接入nxnos
NXN协议建立在灵活、可扩展的架构之上,实现了NXN应用程序和NXNOS之间的无缝通信。本文档涵盖了核心架构组件和概念。
概述
NXN协议遵循客户端-服务端架构,其中:
· NXNOS 是运行NXNOS的服务器
· 客户端 在应用程序内与APP Server保持1:1连接
· 应用程序服务端 为客户端提供工具和能力
架构图 |
核心组件
协议层
协议层处理消息帧、请求/响应链接和高级通信模式。
``python
class Session(BaseSession[RequestT, NotificationT, ResultT]):
async def send_request(
self,
request: RequestT,
result_type: type[Result]
) -> Result:
"""发送请求并等待响应。如果响应包含错误,则引发Error."""
# 请求处理实施
async def send_notification(
self,
notification: NotificationT
) -> None:
"""发送不需要响应的单向通知。"""
# 通知处理实施
async def _received_request(
self,
responder: RequestResponder[ReceiveRequestT, ResultT]
) -> None:
"""处理收到的请求。"""
# 请求处理实施
async def _received_notification(
self,
notification: ReceiveNotificationT
) -> None:
"""处理收到的通知。"""
# 通知处理实施
```
主要课程包括:
- 协议
- 客户端
- 应用程序服务端
传输层
传输层处理客户端和应用程序服务端之间的实际通信。NXN协议支持多种传输机制:
- HTTP与SSE传输对应用程序服务端到客户端的消息使用服务端发送事件客户端到服务端消息的HTTP POST
所有传输都使用JSON-RPC2.0用于交换消息。有关模型上下文协议消息格式的详细信息,请参阅[规范](/struction/)。
消息类型
NXN协议有以下主要类型的消息:
Requests 期待对方的回应:
interface Request {
method: string;
params?: { ... };
}
Results 是对请求的成功响应:
interface Result {
[key: string]: unknown;
}
Errors 表示请求失败:
interface Error {
code: number;
message: string;
data?: unknown;
}
Notifications 是不需要响应的单向消息:
interface Notification {
method: string;
params?: { ... };
}
连接生命周期
1. 初始化
1.客户端发送带有协议版本和功能的initialize请求
2.应用程序服务端以其协议版本和功能进行响应
3.客户端发送initialize通知作为确认
4.正常消息交换开始
2. 消息交换
初始化后,支持以下模式:
- 请求-响应:客户端或应用程序服务端发送请求,另一个响应
- 通知:任何一方发送单向消息
3. 终端
任何一方都可以终止连接:
- 通过close() 清理关闭
- 传输断开
- 错误条件
错误处理
NXN协议定义了这些标准错误代码:
enum ErrorCode {
// 标准JSON-RPC错误代码
ParseError = -32700,
InvalidRequest = -32600,
MethodNotFound = -32601,
InvalidParams = -32602,
InternalError = -32603
}
SDK和应用程序可以在上面定义自己的错误代码 -32000.
错误通过以下方式传播:
- 对请求的错误响应
- 传输错误事件
- 协议级错误处理程序
实施示例
下面是一个实现NXN协议的应用程序服务端的基本示例:
import asyncio
import mcp.types as types
from mcp.server import Server
from mcp.server.stdio import stdio_server
app = Server("example-server")
@app.list_resources()
async def list_resources() -> list[types.Resource]:
return [
types.Resource(
uri="example://resource",
name="Example Resource"
)
]
async def main():
async with stdio_server() as streams:
await app.run(
streams[0],
streams[1],
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
最佳实践
传输器选择
1.远程通信
在需要HTTP兼容性的场景中使用SSE
考虑安全影响,包括身份验证和授权
消息处理
1.请求处理
彻底验证输入
使用类型安全架构
优雅地处理错误
实施超时
2.进度报告
使用进度令牌进行长时间操作
逐步报告进度
包括已知的总进度
3.错误管理
使用适当的错误代码
包括有用的错误消息
在出现错误时清理资源
安全考虑
1.运输安全
使用TLS进行远程连接
验证连接来源
需要时实施身份验证
2.消息验证
验证所有传入消息
对输入进行消毒
检查邮件大小限制
验证JSON-RPC格式
3.资源保护
实施访问控制
验证资源路径
监控资源使用情况
速率限制请求
4.错误处理
不要泄露敏感信息
记录与安全相关的错误
实施适当的清理
处理DoS场景
调试和监控
1.日志记录
记录协议事件
跟踪消息流
监控性能
记录错误
2.诊断
实施健康检查
监控连接状态
跟踪资源使用情况
配置文件性能
3.测试
测试不同的运输方式
验证错误处理
检查边缘情况
负载测试服务器