程序员求职经验分享与学习资料整理平台

网站首页 > 文章精选 正文

MCP 客户端连接与请求流程深度解析

balukai 2025-07-27 18:37:34 文章精选 4 ℃

1. 概述

Model Context Protocol (MCP) 是一个标准化的协议,使 AI 模型能够以结构化的方式与外部工具和资源交互。在 Spring AI 中,
HttpClientSseClientTransport 是实现 MCP 客户端通过 HTTP SSE(Server-Sent Events)与服务器通信的核心组件。

2. 核心架构

2.1 双向通信模式

MCP 采用双向通信模式:

  • HTTP GET:客户端 → 服务器(请求建立)
  • SSE 连接:服务器 → 客户端(推送事件)
  • HTTP POST:客户端 → 服务器(按需发送)

2.2 核心组件关系

McpSyncClient (同步客户端)
    ↓ 委托
McpAsyncClient (异步客户端)
    ↓ 使用
McpClientSession (会话管理)
    ↓ 使用
HttpClientSseClientTransport (SSE 传输层)
    ↓ 使用
FlowSseClient (SSE 客户端)

3. 详细流程分析

3.1客户端创建阶段

3.1.1 传输层配置

// 1. HTTP 客户端配置
HttpClient.Builder httpClientBuilder = HttpClient.newBuilder();
httpClientBuilder.connectTimeout(Duration.ofSeconds(5));

// 2. HTTP 请求构建器配置
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder();

// 3. SSE 传输层构建
HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder("http://localhost:8080")
        .requestBuilder(requestBuilder)
        .sseEndpoint("/customer/mcp/sse")
        .objectMapper(objectMapper)
        .clientBuilder(httpClientBuilder).build();

关键配置说明:

  • connectTimeout(Duration.ofSeconds(5)):设置连接超时为 5 秒
  • sseEndpoint("/customer/mcp/sse"):指定 SSE 端点路径
  • objectMapper:用于 JSON 序列化/反序列化

3.1.2 客户端构建

// 4. MCP 同步客户端创建
McpSyncClient mcpSyncClient = McpClient.sync(transport)
        .requestTimeout(Duration.ofSeconds(10))
        .build();

构建过程详解:

在 McpClient.SyncSpec.build() 方法中:

public McpSyncClient build() {
    McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(
        this.clientInfo, 
        this.capabilities,
        this.roots, 
        this.toolsChangeConsumers, 
        this.resourcesChangeConsumers, 
        this.promptsChangeConsumers,
        this.loggingConsumers, 
        this.samplingHandler
    );

    McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);

    return new McpSyncClient(
        new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, asyncFeatures)
    );
}

3.2 客户端初始化阶段

3.2.1 会话创建

在 McpAsyncClient 构造函数中:

McpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout,
        McpClientFeatures.Async features) {
    
    // 配置请求处理器
    Map<String, RequestHandler<?>> requestHandlers = new HashMap<>();
    
    // 配置通知处理器
    Map<String, NotificationHandler> notificationHandlers = new HashMap<>();
    
    // 创建 MCP 会话
    this.mcpSession = new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers);
}

3.2.2 传输层连接建立

在 McpClientSession 构造函数中:

public McpClientSession(Duration requestTimeout, McpClientTransport transport,
        Map<String, RequestHandler<?>> requestHandlers, Map<String, NotificationHandler> notificationHandlers) {
    
    // 建立传输层连接
    this.connection = this.transport.connect(mono -> mono.doOnNext(this::handle)).subscribe();
}

3.3SSE连接建立过程

3.3.1 连接初始化


HttpClientSseClientTransport.connect() 方法中:

@Override
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
    CompletableFuture<Void> future = new CompletableFuture<>();
    connectionFuture.set(future);

    // 构建 SSE 连接 URI
    URI clientUri = Utils.resolveUri(this.baseUri, this.sseEndpoint);
    
    // 订阅 SSE 事件流
    sseClient.subscribe(clientUri.toString(), new FlowSseClient.SseEventHandler() {
        @Override
        public void onEvent(SseEvent event) {
            if (isClosing) {
                return;
            }

            try {
                if (ENDPOINT_EVENT_TYPE.equals(event.type())) {
                    // 处理端点发现事件
                    String endpoint = event.data();
                    messageEndpoint.set(endpoint);
                    closeLatch.countDown();
                    future.complete(null);
                }
                else if (MESSAGE_EVENT_TYPE.equals(event.type())) {
                    // 处理 JSON-RPC 消息
                    JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, event.data());
                    handler.apply(Mono.just(message)).subscribe();
                }
                else {
                    logger.error("Received unrecognized SSE event type: {}", event.type());
                }
            }
            catch (IOException e) {
                logger.error("Error processing SSE event", e);
                future.completeExceptionally(e);
            }
        }

        @Override
        public void onError(Throwable error) {
            if (!isClosing) {
                logger.error("SSE connection error", error);
                future.completeExceptionally(error);
            }
        }
    });

    return Mono.fromFuture(future);
}

3.3.2 SSE 事件流处理

在 FlowSseClient.subscribe() 方法中:

public void subscribe(String url, SseEventHandler eventHandler) {
    // 构建 SSE 请求
    HttpRequest request = this.requestBuilder.uri(URI.create(url))
        .header("Accept", "text/event-stream")
        .header("Cache-Control", "no-cache")
        .GET()
        .build();

    StringBuilder eventBuilder = new StringBuilder();
    AtomicReference<String> currentEventId = new AtomicReference<>();
    AtomicReference<String> currentEventType = new AtomicReference<>("message");

    // 创建 Flow.Subscriber 处理 SSE 流
    Flow.Subscriber<String> lineSubscriber = new Flow.Subscriber<>() {
        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            subscription.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(String line) {
            if (line.isEmpty()) {
                // 空行表示事件结束,处理完整事件
                if (eventBuilder.length() > 0) {
                    String eventData = eventBuilder.toString();
                    SseEvent event = new SseEvent(
                        currentEventId.get(), 
                        currentEventType.get(), 
                        eventData.trim()
                    );
                    eventHandler.onEvent(event);
                    eventBuilder.setLength(0);
                }
            }
            else {
                // 解析 SSE 事件字段
                if (line.startsWith("data:")) {
                    var matcher = EVENT_DATA_PATTERN.matcher(line);
                    if (matcher.find()) {
                        eventBuilder.append(matcher.group(1).trim()).append("\n");
                    }
                }
                else if (line.startsWith("id:")) {
                    var matcher = EVENT_ID_PATTERN.matcher(line);
                    if (matcher.find()) {
                        currentEventId.set(matcher.group(1).trim());
                    }
                }
                else if (line.startsWith("event:")) {
                    var matcher = EVENT_TYPE_PATTERN.matcher(line);
                    if (matcher.find()) {
                        currentEventType.set(matcher.group(1).trim());
                    }
                }
            }
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            eventHandler.onError(throwable);
        }

        @Override
        public void onComplete() {
            // 处理剩余的事件数据
            if (eventBuilder.length() > 0) {
                String eventData = eventBuilder.toString();
                SseEvent event = new SseEvent(
                    currentEventId.get(), 
                    currentEventType.get(), 
                    eventData.trim()
                );
                eventHandler.onEvent(event);
            }
        }
    };

    // 发送异步请求
    Function<Flow.Subscriber<String>, HttpResponse.BodySubscriber<Void>> subscriberFactory = 
        subscriber -> HttpResponse.BodySubscribers.fromLineSubscriber(subscriber);

    CompletableFuture<HttpResponse<Void>> future = this.httpClient.sendAsync(request,
            info -> subscriberFactory.apply(lineSubscriber));

    future.thenAccept(response -> {
        int status = response.statusCode();
        if (status != 200 && status != 201 && status != 202 && status != 206) {
            throw new RuntimeException("Failed to connect to SSE stream. Unexpected status code: " + status);
        }
    }).exceptionally(throwable -> {
        eventHandler.onError(throwable);
        return null;
    });
}

3.4 客户端初始化

3.4.1初始化调用

// 5. 客户端初始化
mcpSyncClient.initialize();

3.4.2 初始化流程

在 McpAsyncClient.initialize() 方法中:

public Mono<McpSchema.InitializeResult> initialize() {
    String latestVersion = this.protocolVersions.get(this.protocolVersions.size() - 1);

    // 构建初始化请求
    McpSchema.InitializeRequest initializeRequest = new McpSchema.InitializeRequest(
        latestVersion,
        this.clientCapabilities,
        this.clientInfo
    );

    // 发送初始化请求
    Mono<McpSchema.InitializeResult> result = this.mcpSession.sendRequest(
        McpSchema.METHOD_INITIALIZE,
        initializeRequest, 
        new TypeReference<McpSchema.InitializeResult>() {}
    );

    return result.flatMap(initializeResult -> {
        // 保存服务器信息
        this.serverCapabilities = initializeResult.capabilities();
        this.serverInstructions = initializeResult.instructions();
        this.serverInfo = initializeResult.serverInfo();

        logger.info("Server response with Protocol: {}, Capabilities: {}, Info: {} and Instructions {}",
                initializeResult.protocolVersion(), initializeResult.capabilities(), 
                initializeResult.serverInfo(), initializeResult.instructions());

        // 验证协议版本
        if (!this.protocolVersions.contains(initializeResult.protocolVersion())) {
            return Mono.error(new McpError(
                "Unsupported protocol version from the server: " + initializeResult.protocolVersion()));
        }

        // 发送初始化完成通知
        return this.mcpSession.sendNotification(McpSchema.METHOD_NOTIFICATION_INITIALIZED, null)
            .doOnSuccess(v -> {
                this.initialized.set(true);
                this.initializedSink.tryEmitValue(initializeResult);
            })
            .thenReturn(initializeResult);
    });
}

3.5 请求发送机制

3.5.1 请求发送流程


McpClientSession.sendRequest() 方法中:

@Override
public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReference<T> typeRef) {
    String requestId = this.generateRequestId();

    return Mono.deferContextual(ctx -> Mono.<McpSchema.JSONRPCResponse>create(sink -> {
        // 保存待处理的响应
        this.pendingResponses.put(requestId, sink);
        
        // 构建 JSON-RPC 请求
        McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(
            McpSchema.JSONRPC_VERSION, 
            method,
            requestId, 
            requestParams
        );
        
        // 发送请求
        this.transport.sendMessage(jsonrpcRequest)
            .contextWrite(ctx)
            .subscribe(v -> {
            }, error -> {
                this.pendingResponses.remove(requestId);
                sink.error(error);
            });
    })).timeout(this.requestTimeout).handle((jsonRpcResponse, sink) -> {
        if (jsonRpcResponse.error() != null) {
            logger.error("Error handling request: {}", jsonRpcResponse.error());
            sink.error(new McpError(jsonRpcResponse.error()));
        }
        else {
            if (typeRef.getType().equals(Void.class)) {
                sink.complete();
            }
            else {
                sink.next(this.transport.unmarshalFrom(jsonRpcResponse.result(), typeRef));
            }
        }
    });
}

3.5.2消息发送实现


HttpClientSseClientTransport.sendMessage() 方法中:

@Override
public Mono<Void> sendMessage(JSONRPCMessage message) {
    if (isClosing) {
        return Mono.empty();
    }

    try {
        // 等待端点发现
        if (!closeLatch.await(10, TimeUnit.SECONDS)) {
            return Mono.error(new McpError("Failed to wait for the message endpoint"));
        }
    }
    catch (InterruptedException e) {
        return Mono.error(new McpError("Failed to wait for the message endpoint"));
    }

    String endpoint = messageEndpoint.get();
    if (endpoint == null) {
        return Mono.error(new McpError("No message endpoint available"));
    }

    try {
        // 序列化消息
        String jsonText = this.objectMapper.writeValueAsString(message);
        
        // 构建请求 URI
        URI requestUri = Utils.resolveUri(baseUri, endpoint);
        
        // 构建 HTTP POST 请求
        HttpRequest request = this.requestBuilder.uri(requestUri)
            .POST(HttpRequest.BodyPublishers.ofString(jsonText))
            .build();

        // 发送异步请求
        return Mono.fromFuture(
            httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding())
                .thenAccept(response -> {
                    if (response.statusCode() != 200 && response.statusCode() != 201 && 
                        response.statusCode() != 202 && response.statusCode() != 206) {
                        logger.error("Error sending message: {}", response.statusCode());
                    }
                })
        );
    }
    catch (IOException e) {
        if (!isClosing) {
            return Mono.error(new RuntimeException("Failed to serialize message", e));
        }
        return Mono.empty();
    }
}

4.关键机制总结

4.1双向通信机制

  1. SSE 连接:建立持久连接接收服务器推送
  2. 端点发现:通过 SSE 接收消息发送端点
  3. HTTP POST:使用发现的端点发送客户端消息

4.2事件处理机制

  1. endpoint 事件:服务器发送消息端点 URL
  2. message 事件:服务器发送 JSON-RPC 消息
  3. 错误处理:连接错误和消息处理错误

4.3会话管理机制

  1. 请求 ID 生成:确保请求-响应匹配
  2. 超时处理:请求超时和连接超时
  3. 状态管理:初始化状态和连接状态

4.4 错误处理机制

  1. 连接错误:SSE 连接失败处理
  2. 序列化错误:JSON 序列化/反序列化错误
  3. 超时错误:请求超时和端点发现超时

Tags:

最近发表
标签列表