网站首页 > 文章精选 正文
1. 概述
Model Context Protocol (MCP) 是一个标准化的协议,使 AI 模型能够以结构化的方式与外部工具和资源交互。在 Spring AI 中,
HttpClientSseClientTransport 是实现 MCP 客户端通过 HTTP SSE(Server-Sent Events)与服务器通信的核心组件。
2. 核心架构
2.1 双向通信模式
MCP 采用双向通信模式:
- HTTP GET:客户端 → 服务器(请求建立)
- SSE 连接:服务器 → 客户端(推送事件)
- HTTP POST:客户端 → 服务器(按需发送)
2.2 核心组件关系
McpSyncClient (同步客户端)
↓ 委托
McpAsyncClient (异步客户端)
↓ 使用
McpClientSession (会话管理)
↓ 使用
HttpClientSseClientTransport (SSE 传输层)
↓ 使用
FlowSseClient (SSE 客户端)
3. 详细流程分析
3.1客户端创建阶段
3.1.1 传输层配置
// 1. HTTP 客户端配置
HttpClient.Builder httpClientBuilder = HttpClient.newBuilder();
httpClientBuilder.connectTimeout(Duration.ofSeconds(5));
// 2. HTTP 请求构建器配置
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder();
// 3. SSE 传输层构建
HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder("http://localhost:8080")
.requestBuilder(requestBuilder)
.sseEndpoint("/customer/mcp/sse")
.objectMapper(objectMapper)
.clientBuilder(httpClientBuilder).build();
关键配置说明:
- connectTimeout(Duration.ofSeconds(5)):设置连接超时为 5 秒
- sseEndpoint("/customer/mcp/sse"):指定 SSE 端点路径
- objectMapper:用于 JSON 序列化/反序列化
3.1.2 客户端构建
// 4. MCP 同步客户端创建
McpSyncClient mcpSyncClient = McpClient.sync(transport)
.requestTimeout(Duration.ofSeconds(10))
.build();
构建过程详解:
在 McpClient.SyncSpec.build() 方法中:
public McpSyncClient build() {
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(
this.clientInfo,
this.capabilities,
this.roots,
this.toolsChangeConsumers,
this.resourcesChangeConsumers,
this.promptsChangeConsumers,
this.loggingConsumers,
this.samplingHandler
);
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
return new McpSyncClient(
new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, asyncFeatures)
);
}
3.2 客户端初始化阶段
3.2.1 会话创建
在 McpAsyncClient 构造函数中:
McpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout,
McpClientFeatures.Async features) {
// 配置请求处理器
Map<String, RequestHandler<?>> requestHandlers = new HashMap<>();
// 配置通知处理器
Map<String, NotificationHandler> notificationHandlers = new HashMap<>();
// 创建 MCP 会话
this.mcpSession = new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers);
}
3.2.2 传输层连接建立
在 McpClientSession 构造函数中:
public McpClientSession(Duration requestTimeout, McpClientTransport transport,
Map<String, RequestHandler<?>> requestHandlers, Map<String, NotificationHandler> notificationHandlers) {
// 建立传输层连接
this.connection = this.transport.connect(mono -> mono.doOnNext(this::handle)).subscribe();
}
3.3SSE连接建立过程
3.3.1 连接初始化
在
HttpClientSseClientTransport.connect() 方法中:
@Override
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
CompletableFuture<Void> future = new CompletableFuture<>();
connectionFuture.set(future);
// 构建 SSE 连接 URI
URI clientUri = Utils.resolveUri(this.baseUri, this.sseEndpoint);
// 订阅 SSE 事件流
sseClient.subscribe(clientUri.toString(), new FlowSseClient.SseEventHandler() {
@Override
public void onEvent(SseEvent event) {
if (isClosing) {
return;
}
try {
if (ENDPOINT_EVENT_TYPE.equals(event.type())) {
// 处理端点发现事件
String endpoint = event.data();
messageEndpoint.set(endpoint);
closeLatch.countDown();
future.complete(null);
}
else if (MESSAGE_EVENT_TYPE.equals(event.type())) {
// 处理 JSON-RPC 消息
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, event.data());
handler.apply(Mono.just(message)).subscribe();
}
else {
logger.error("Received unrecognized SSE event type: {}", event.type());
}
}
catch (IOException e) {
logger.error("Error processing SSE event", e);
future.completeExceptionally(e);
}
}
@Override
public void onError(Throwable error) {
if (!isClosing) {
logger.error("SSE connection error", error);
future.completeExceptionally(error);
}
}
});
return Mono.fromFuture(future);
}
3.3.2 SSE 事件流处理
在 FlowSseClient.subscribe() 方法中:
public void subscribe(String url, SseEventHandler eventHandler) {
// 构建 SSE 请求
HttpRequest request = this.requestBuilder.uri(URI.create(url))
.header("Accept", "text/event-stream")
.header("Cache-Control", "no-cache")
.GET()
.build();
StringBuilder eventBuilder = new StringBuilder();
AtomicReference<String> currentEventId = new AtomicReference<>();
AtomicReference<String> currentEventType = new AtomicReference<>("message");
// 创建 Flow.Subscriber 处理 SSE 流
Flow.Subscriber<String> lineSubscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(String line) {
if (line.isEmpty()) {
// 空行表示事件结束,处理完整事件
if (eventBuilder.length() > 0) {
String eventData = eventBuilder.toString();
SseEvent event = new SseEvent(
currentEventId.get(),
currentEventType.get(),
eventData.trim()
);
eventHandler.onEvent(event);
eventBuilder.setLength(0);
}
}
else {
// 解析 SSE 事件字段
if (line.startsWith("data:")) {
var matcher = EVENT_DATA_PATTERN.matcher(line);
if (matcher.find()) {
eventBuilder.append(matcher.group(1).trim()).append("\n");
}
}
else if (line.startsWith("id:")) {
var matcher = EVENT_ID_PATTERN.matcher(line);
if (matcher.find()) {
currentEventId.set(matcher.group(1).trim());
}
}
else if (line.startsWith("event:")) {
var matcher = EVENT_TYPE_PATTERN.matcher(line);
if (matcher.find()) {
currentEventType.set(matcher.group(1).trim());
}
}
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
eventHandler.onError(throwable);
}
@Override
public void onComplete() {
// 处理剩余的事件数据
if (eventBuilder.length() > 0) {
String eventData = eventBuilder.toString();
SseEvent event = new SseEvent(
currentEventId.get(),
currentEventType.get(),
eventData.trim()
);
eventHandler.onEvent(event);
}
}
};
// 发送异步请求
Function<Flow.Subscriber<String>, HttpResponse.BodySubscriber<Void>> subscriberFactory =
subscriber -> HttpResponse.BodySubscribers.fromLineSubscriber(subscriber);
CompletableFuture<HttpResponse<Void>> future = this.httpClient.sendAsync(request,
info -> subscriberFactory.apply(lineSubscriber));
future.thenAccept(response -> {
int status = response.statusCode();
if (status != 200 && status != 201 && status != 202 && status != 206) {
throw new RuntimeException("Failed to connect to SSE stream. Unexpected status code: " + status);
}
}).exceptionally(throwable -> {
eventHandler.onError(throwable);
return null;
});
}
3.4 客户端初始化
3.4.1初始化调用
// 5. 客户端初始化
mcpSyncClient.initialize();
3.4.2 初始化流程
在 McpAsyncClient.initialize() 方法中:
public Mono<McpSchema.InitializeResult> initialize() {
String latestVersion = this.protocolVersions.get(this.protocolVersions.size() - 1);
// 构建初始化请求
McpSchema.InitializeRequest initializeRequest = new McpSchema.InitializeRequest(
latestVersion,
this.clientCapabilities,
this.clientInfo
);
// 发送初始化请求
Mono<McpSchema.InitializeResult> result = this.mcpSession.sendRequest(
McpSchema.METHOD_INITIALIZE,
initializeRequest,
new TypeReference<McpSchema.InitializeResult>() {}
);
return result.flatMap(initializeResult -> {
// 保存服务器信息
this.serverCapabilities = initializeResult.capabilities();
this.serverInstructions = initializeResult.instructions();
this.serverInfo = initializeResult.serverInfo();
logger.info("Server response with Protocol: {}, Capabilities: {}, Info: {} and Instructions {}",
initializeResult.protocolVersion(), initializeResult.capabilities(),
initializeResult.serverInfo(), initializeResult.instructions());
// 验证协议版本
if (!this.protocolVersions.contains(initializeResult.protocolVersion())) {
return Mono.error(new McpError(
"Unsupported protocol version from the server: " + initializeResult.protocolVersion()));
}
// 发送初始化完成通知
return this.mcpSession.sendNotification(McpSchema.METHOD_NOTIFICATION_INITIALIZED, null)
.doOnSuccess(v -> {
this.initialized.set(true);
this.initializedSink.tryEmitValue(initializeResult);
})
.thenReturn(initializeResult);
});
}
3.5 请求发送机制
3.5.1 请求发送流程
在
McpClientSession.sendRequest() 方法中:
@Override
public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReference<T> typeRef) {
String requestId = this.generateRequestId();
return Mono.deferContextual(ctx -> Mono.<McpSchema.JSONRPCResponse>create(sink -> {
// 保存待处理的响应
this.pendingResponses.put(requestId, sink);
// 构建 JSON-RPC 请求
McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(
McpSchema.JSONRPC_VERSION,
method,
requestId,
requestParams
);
// 发送请求
this.transport.sendMessage(jsonrpcRequest)
.contextWrite(ctx)
.subscribe(v -> {
}, error -> {
this.pendingResponses.remove(requestId);
sink.error(error);
});
})).timeout(this.requestTimeout).handle((jsonRpcResponse, sink) -> {
if (jsonRpcResponse.error() != null) {
logger.error("Error handling request: {}", jsonRpcResponse.error());
sink.error(new McpError(jsonRpcResponse.error()));
}
else {
if (typeRef.getType().equals(Void.class)) {
sink.complete();
}
else {
sink.next(this.transport.unmarshalFrom(jsonRpcResponse.result(), typeRef));
}
}
});
}
3.5.2消息发送实现
在
HttpClientSseClientTransport.sendMessage() 方法中:
@Override
public Mono<Void> sendMessage(JSONRPCMessage message) {
if (isClosing) {
return Mono.empty();
}
try {
// 等待端点发现
if (!closeLatch.await(10, TimeUnit.SECONDS)) {
return Mono.error(new McpError("Failed to wait for the message endpoint"));
}
}
catch (InterruptedException e) {
return Mono.error(new McpError("Failed to wait for the message endpoint"));
}
String endpoint = messageEndpoint.get();
if (endpoint == null) {
return Mono.error(new McpError("No message endpoint available"));
}
try {
// 序列化消息
String jsonText = this.objectMapper.writeValueAsString(message);
// 构建请求 URI
URI requestUri = Utils.resolveUri(baseUri, endpoint);
// 构建 HTTP POST 请求
HttpRequest request = this.requestBuilder.uri(requestUri)
.POST(HttpRequest.BodyPublishers.ofString(jsonText))
.build();
// 发送异步请求
return Mono.fromFuture(
httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding())
.thenAccept(response -> {
if (response.statusCode() != 200 && response.statusCode() != 201 &&
response.statusCode() != 202 && response.statusCode() != 206) {
logger.error("Error sending message: {}", response.statusCode());
}
})
);
}
catch (IOException e) {
if (!isClosing) {
return Mono.error(new RuntimeException("Failed to serialize message", e));
}
return Mono.empty();
}
}
4.关键机制总结
4.1双向通信机制
- SSE 连接:建立持久连接接收服务器推送
- 端点发现:通过 SSE 接收消息发送端点
- HTTP POST:使用发现的端点发送客户端消息
4.2事件处理机制
- endpoint 事件:服务器发送消息端点 URL
- message 事件:服务器发送 JSON-RPC 消息
- 错误处理:连接错误和消息处理错误
4.3会话管理机制
- 请求 ID 生成:确保请求-响应匹配
- 超时处理:请求超时和连接超时
- 状态管理:初始化状态和连接状态
4.4 错误处理机制
- 连接错误:SSE 连接失败处理
- 序列化错误:JSON 序列化/反序列化错误
- 超时错误:请求超时和端点发现超时
猜你喜欢
- 2025-07-27 HTTP状态码滥用指南:全栈开发者的REST API最佳实践
- 2025-07-27 开发者必须了解的HTTP头部(http头部参数)
- 2025-07-27 轻松操控C#下载文件:WebClient与HttpClient实战详解
- 2025-07-27 http状态码完整版,再也不需要度娘了。
- 2025-07-27 探索Apache HttpClient超时时间如何设定?
- 2025-07-27 Java实现调用HTTP请求的几种常见方式
- 2025-07-27 HTTP与HTTPS的区别(http和https之间的区别)
- 2025-07-27 100个Java工具类之6:用4种方式发起HTTP请求
- 2025-07-27 HTTP和HTTPS(HTTP和HTTPS的主要区别是什么)
- 2025-07-27 http常见状态码(http常见状态码的含义)
- 最近发表
- 标签列表
-
- newcoder (56)
- 字符串的长度是指 (45)
- drawcontours()参数说明 (60)
- unsignedshortint (59)
- postman并发请求 (47)
- python列表删除 (50)
- 左程云什么水平 (56)
- 编程题 (64)
- postgresql默认端口 (66)
- 数据库的概念模型独立于 (48)
- 产生系统死锁的原因可能是由于 (51)
- 数据库中只存放视图的 (62)
- 在vi中退出不保存的命令是 (53)
- 哪个命令可以将普通用户转换成超级用户 (49)
- noscript标签的作用 (48)
- 联合利华网申 (49)
- swagger和postman (46)
- 结构化程序设计主要强调 (53)
- 172.1 (57)
- apipostwebsocket (47)
- 唯品会后台 (61)
- 简历助手 (56)
- offshow (61)
- mysql数据库面试题 (57)
- fmt.println (52)