AI Agent 的流式响应与实时交互:从 SSE 到 WebSocket 的 Java 实战

系列文章

本系列致力于用 Java 生态深入理解 AI Agent 的核心机制。以下是完整目录:

  1. 理解 AI Agent 的大脑:ReAct 模式从入门到实战
  2. AI Agent 的工具箱:深入理解 Tool Use 与 Spring AI Function Calling 实战
  3. AI Agent 的记忆力是怎么实现的——LangChain4j Memory 机制深度解析
  4. AI Agent 的记忆系统:从 ChatMemory 到持久化记忆的 Java 实战
  5. AI Agent 的灵魂对话:Prompt Engineering 系统提示词设计的艺术与工程
  6. AI Agent 的规划大脑:从任务分解到自适应执行策略
  7. MCP 模型上下文协议:AI 的万能接口与 MCP Server 实战
  8. 让 AI 学会”说人话”——Spring AI 结构化输出实战
  9. Spring AI 核心架构全解析:从 ChatModel 到 Advisor Chain 的设计哲学
  10. 从零理解 RAG:检索增强生成完整指南
  11. Embedding 向量化的魔法:从文本到向量的数学之旅与 Java 实战
  12. AI Agent 的知识检索引擎:从向量搜索到智能检索策略的 Java 实战
  13. 当 RAG 遇上知识图谱:GraphRAG 原理与 Java 实战
  14. 当 RAG 遇到 Agent:Agentic RAG 的架构设计与 Java 实战
  15. AI Agent 团队协作:多 Agent 系统架构设计与 Java 实战
  16. AI Agent 的安全防线:Prompt 注入防御与生产级安全防护实战
  17. AI Agent 的可观测性:从链路追踪到成本监控的 Java 实战
  18. AI Agent 的评估与优化:从基准测试到生产环境的质量守护实战
  19. AI Agent 的流式响应与实时交互:从 SSE 到 WebSocket 的 Java 实战(本文)

引言:你有没有想过,为什么 ChatGPT 的回答是”一个字一个字蹦出来”的?

如果你用过 ChatGPT、Claude 或者任何 AI 聊天产品,你一定注意到了一个现象:AI 的回答不是一次性全部展示出来的,而是像打字机一样,一个 token 一个 token 地”蹦”出来。

这不是炫技,也不是为了好看。这是 LLM(大语言模型)的工作方式决定的——模型是逐 token 生成的,每生成一个 token 都需要一次前向推理。如果等全部生成完再返回,用户可能要等 10 秒甚至更久才能看到第一个字。但通过流式输出,用户在 0.5 秒内就能看到第一个 token,感知延迟降低了 20 倍以上

这就是今天要聊的主题:AI Agent 的流式响应与实时交互

在之前的系列文章中,我们已经深入理解了 Agent 的记忆系统工具调用RAG 检索等核心能力。但有一个问题我们一直没正面回答:Agent 的响应怎么实时传递给用户?

这个问题看似简单,实则涉及一整条技术链路:从 LLM 的 token 流,到后端的流式协议,到前端的实时渲染,再到工具调用时的流式聚合。任何一个环节出问题,用户体验就会断崖式下降。

本文将从底层协议讲起,逐步深入到 Spring AI 和 LangChain4j 的流式 API 实现,最后给出一个完整的生产级流式 Agent 系统。

一、流式响应的技术基础:SSE vs WebSocket

在实现流式输出之前,我们需要先选择一种”管道”来把数据从服务端推送到客户端。主流方案有两种:SSE(Server-Sent Events)WebSocket

1.1 SSE:单向推送的”收音机”

SSE 是一种基于 HTTP 的单向推送协议。客户端通过普通的 HTTP 请求建立连接,服务端持续向客户端推送数据,直到连接关闭。

它的核心特点是简单

1
2
客户端 --GET /stream--> 服务端
客户端 <--text/event-stream-- 服务端(持续推送)

SSE 的数据格式是纯文本,每条消息以 data: 开头,用 \n\n 分隔:

1
2
3
4
5
6
7
data: {"token": "你"}
\n\n
data: {"token": "好"}
\n\n
data: {"token": "啊"}
\n\n
data: [DONE]

为什么 SSE 特别适合 LLM 流式输出?

  1. 基于 HTTP:不需要额外的协议升级,Nginx、CDN、负载均衡器都能正常代理
  2. 自动重连:浏览器原生支持 EventSource API,断线后自动重连
  3. 单向推送:LLM 的流式输出天然是单向的(服务端 → 客户端),不需要双向通信
  4. 文本友好:LLM 输出的就是文本,SSE 的文本格式完美匹配

1.2 WebSocket:双向通信的”电话”

WebSocket 是一种全双工通信协议,客户端和服务端可以随时互发消息。

1
2
3
客户端 --GET /ws (Upgrade)--> 服务端
客户端 <--handshake-- 服务端
客户端 <---> 服务端(双向通信)

WebSocket 的特点是双向二进制支持,但它也有代价:

  1. 协议升级:需要从 HTTP 升级到 WebSocket,经过代理时可能有问题
  2. 连接管理:需要自己实现心跳、重连、断线检测
  3. 状态保持:WebSocket 连接是有状态的,水平扩展时需要 sticky session 或消息总线

1.3 SSE vs WebSocket:LLM 流式输出该选谁?

维度 SSE WebSocket
通信方向 单向(服务端→客户端) 双向
协议 HTTP/1.1 或 HTTP/2 独立协议(ws://)
代理兼容性 极好(标准 HTTP) 一般(需要升级支持)
自动重连 浏览器原生支持 需手动实现
二进制支持 不支持 支持
连接开销 中等
适用场景 LLM 流式输出、通知推送 聊天室、游戏、实时协作

结论:对于 AI Agent 的流式输出,SSE 是首选方案。原因很简单——LLM 的流式输出是典型的”服务端单向推送文本”场景,SSE 的设计哲学与此完美匹配。只有在需要双向实时交互(比如用户中途打断 Agent、实时协作编辑 Agent 配置)时,才需要考虑 WebSocket。

Spring AI 的官方示例也选择了 SSE 作为流式输出的默认方案。OpenAI、Anthropic、Google 的 API 也都使用 SSE 作为流式传输协议。

二、Spring AI 的流式输出:从接口到源码

2.1 核心接口:StreamingChatModel

Spring AI 为流式输出定义了专门的接口 StreamingChatModel,与同步的 ChatModel 并列:

1
2
3
4
5
6
7
8
9
10
11
public interface StreamingChatModel extends Model<Prompt, ChatResponse> {
// 同步方法(继承自 Model)
ChatResponse call(Prompt prompt);

// 流式方法
default Flux<ChatResponse> stream(Prompt prompt) {
return stream(prompt, null);
}

Flux<ChatResponse> stream(Prompt prompt, ChatResponse previousChatResponse);
}

这里有一个设计上的巧思:StreamingChatModel 同时继承了 call()stream() 方法。这意味着一个 Model 实现可以同时支持同步和流式两种调用方式。在 OpenAI 的实现中,OpenAiChatModel 同时实现了 ChatModelStreamingChatModel

2.2 源码分析:OpenAI 流式调用的内部机制

让我们看看 Spring AI 中 OpenAI 的流式实现。核心逻辑在 OpenAiChatModel 类中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// OpenAiChatModel.java(简化版)
public class OpenAiChatModel implements ChatModel, StreamingChatModel {

private final OpenAiApi api; // 底层 HTTP 客户端

@Override
public Flux<ChatResponse> stream(Prompt prompt, ChatResponse previousChatResponse) {
// 1. 构建请求参数
ChatCompletionRequest request = createRequest(prompt, true); // stream=true

// 2. 调用 OpenAI API,获取 SSE 流
Flux<ChatCompletionChunk> chunkFlux = this.api.chatCompletionStream(request);

// 3. 将 chunk 流转换为 ChatResponse 流
return chunkFlux.map(this::toChatResponse)
.switchOnFirst((signal, flux) -> {
// 处理第一个 chunk(包含 role 信息)
// 后续 chunk 包含 content 片段
});
}
}

关键步骤拆解:

  1. **设置 stream=true**:告诉 OpenAI API 返回 SSE 流而非完整 JSON
  2. 获取 chunk 流:底层 HTTP 客户端(RestClient 或 WebClient)解析 SSE 响应,每个 data: 行解析为一个 ChatCompletionChunk
  3. 转换为 ChatResponse:将每个 chunk 转换为 Spring AI 统一的 ChatResponse 对象

2.3 实战:构建流式聊天 API

下面是一个完整的流式聊天接口实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@RestController
@RequestMapping("/api/chat")
public class StreamChatController {

private final StreamingChatModel streamingChatModel;

public StreamChatController(StreamingChatModel streamingChatModel) {
this.streamingChatModel = streamingChatModel;
}

/**
* SSE 流式聊天接口
* produces = MediaType.TEXT_EVENT_STREAM_VALUE 表示返回 SSE 流
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChat(@RequestParam String message) {
Prompt prompt = new Prompt(new UserMessage(message));

return streamingChatModel.stream(prompt)
.map(response -> {
String content = response.getResult().getOutput().getText();
return ServerSentEvent.<String>builder()
.data(content != null ? content : "")
.build();
})
.concatWith(Flux.just(
ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build()
))
.onErrorResume(e -> {
// 流式错误处理:发送错误事件后关闭
return Flux.just(
ServerSentEvent.<String>builder()
.event("error")
.data("流式输出异常: " + e.getMessage())
.build()
);
});
}
}

前端消费示例(JavaScript):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const eventSource = new EventSource('/api/chat/stream?message=你好');

eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
// 逐 token 追加到页面
document.getElementById('response').textContent += event.data;
};

eventSource.onerror = (error) => {
console.error('SSE 连接错误:', error);
eventSource.close();
};

2.4 流式输出中的 Tool Call:最容易踩的坑

在之前的Tool Use 文章中,我们详细讲解了 Function Calling 的原理。但在流式场景下,Tool Call 的处理变得格外复杂。

问题在哪?

同步调用时,模型返回的 Tool Call 是一个完整的 JSON:

1
2
3
4
5
6
7
8
{
"tool_calls": [{
"function": {
"name": "getWeather",
"arguments": "{\"city\": \"北京\"}"
}
}]
}

但在流式调用中,这个 Tool Call 被拆成了多个 chunk

1
2
3
4
chunk 1: {"tool_calls": [{"index": 0, "function": {"name": "get"}}]}
chunk 2: {"tool_calls": [{"index": 0, "function": {"name": "Weather"}}]}
chunk 3: {"tool_calls": [{"index": 0, "function": {"arguments": "{\"ci"}}]}
chunk 4: {"tool_calls": [{"index": 0, "function": {"arguments": "ty\": \"北京\"}"}}]}

你需要把碎片拼回来,才能得到完整的 Tool Call。

Spring AI 怎么处理的?

Spring AI 在内部维护了一个聚合器(Aggregator),负责将流式 chunk 合并为完整的 ChatResponse。核心逻辑在 ChatCompletionChunkAggregator 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 简化的聚合逻辑
public class ChatCompletionChunkAggregator {

private final StringBuilder contentBuilder = new StringBuilder();
private final Map<Integer, ToolCallBuilder> toolCallBuilders = new HashMap<>();

public ChatResponse aggregate(ChatCompletionChunk chunk) {
// 1. 聚合文本内容
String delta = chunk.getChoices().get(0).getDelta().getContent();
if (delta != null) {
contentBuilder.append(delta);
}

// 2. 聚合 Tool Call
List<ToolCallDelta> toolCalls = chunk.getChoices().get(0).getDelta().getToolCalls();
if (toolCalls != null) {
for (ToolCallDelta tc : toolCalls) {
toolCallBuilders.computeIfAbsent(tc.getIndex(), k -> new ToolCallBuilder())
.appendName(tc.getFunction().getName())
.appendArguments(tc.getFunction().getArguments());
}
}

// 3. 当 chunk 的 finish_reason 不为 null 时,表示流结束
if (chunk.getChoices().get(0).getFinishReason() != null) {
return buildFinalResponse();
}

return null; // 还没结束,继续聚合
}
}

实际使用时需要注意什么?

如果你想在流式输出中处理 Tool Call,不能简单地逐 chunk 发送给客户端。你需要判断:当前 chunk 是纯文本还是 Tool Call?

  • 纯文本 chunk:直接推送给客户端,实时展示
  • Tool Call chunk:需要在服务端聚合,执行工具,然后将结果再次送给模型,最后把模型的二次响应流式推送

这是一个”流中断”的过程——流式输出会暂停,等工具执行完再继续。Spring AI 的 stream() 方法在内部自动处理了这个流程,但如果你自己实现 SSE Controller,需要理解这个机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 带 Tool Call 的流式输出
* Spring AI 会自动处理 Tool Call 的聚合和执行
*/
@GetMapping(value = "/stream-with-tools", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamWithTools(@RequestParam String message) {
Prompt prompt = new Prompt(
new UserMessage(message),
OpenAiChatOptions.builder()
.function("getWeather") // 注册可用工具
.function("searchDatabase")
.build()
);

return streamingChatModel.stream(prompt)
.map(response -> {
String content = response.getResult().getOutput().getText();
// 注意:Tool Call 执行期间,content 可能为 null
// 但 Tool Call 完成后的最终响应会包含完整文本
if (content != null && !content.isEmpty()) {
return ServerSentEvent.<String>builder()
.data(content)
.build();
}
return ServerSentEvent.<String>builder()
.event("tool-call")
.data("正在调用工具...")
.build();
});
}

三、LangChain4j 的流式输出:另一种设计哲学

3.1 StreamingChatLanguageModel 与 TokenStream

LangChain4j 的流式 API 设计与 Spring AI 截然不同。Spring AI 用响应式的 Flux<T>,LangChain4j 用回调式的 TokenStream

1
2
3
4
5
6
7
8
9
10
11
12
13
// LangChain4j 的流式接口
public interface StreamingChatLanguageModel extends ChatLanguageModel {
void generate(List<ChatMessage> messages, TokenStreamHandler handler);
}

// TokenStream 的使用方式
public interface TokenStream {
TokenStream onPartialResponse(Consumer<String> handler); // 每个 token 的回调
TokenStream onToolCalled(Consumer<ToolExecution> handler); // Tool Call 回调
TokenStream onCompleteResponse(Consumer<ChatResponse> handler); // 完成回调
TokenStream onError(ThrowableHandler handler); // 错误回调
void start(); // 开始生成
}

使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
StreamingChatLanguageModel model = OpenAiStreamingChatModel.builder()
.apiKey(System.getenv("OPENAI_API_KEY"))
.modelName("gpt-4o")
.build();

TokenStream tokenStream = model.generate(
List.of(new UserMessage("你好")),
handler -> handler
.onPartialResponse(token -> System.out.print(token)) // 逐 token 输出
.onCompleteResponse(response -> System.out.println("\n完成"))
.onError(error -> error.printStackTrace())
);
tokenStream.start();

3.2 设计哲学对比:Flux vs Callback

维度 Spring AI (Flux) LangChain4j (Callback)
编程模型 响应式(Reactive Streams) 回调式(Callback)
背压支持 原生支持 不支持
组合能力 可用 map/filter/concat 等操作符链式处理 需手动在回调中处理
学习曲线 需要理解 Reactor 直观,符合传统 Java 思维
与 Web 框架集成 Spring WebFlux 原生支持 需要桥接(如 CompletableFuture)
错误处理 onErrorResume/onErrorReturn onError 回调

哪个更好? 没有绝对答案。如果你的项目已经用了 Spring WebFlux,Spring AI 的 Flux 方案更自然;如果你用的是传统的 Spring MVC,LangChain4j 的回调方式更容易上手。

不过,从工程角度看,Flux 的组合能力更强。比如你要实现”先流式输出文本,遇到 Tool Call 时暂停,执行工具后继续流式输出”,用 Flux 可以优雅地用 concatMap 实现,而回调方式需要手动管理状态机。

四、流式响应的进阶话题

4.1 错误处理:流中断了怎么办?

流式输出的最大挑战是错误处理。同步调用时,一个 try-catch 就能捕获异常;但流式调用中,错误可能发生在流的任何位置。

常见错误场景

  1. 网络中断:客户端断网、服务端重启
  2. Token 耗尽:模型输出到一半,达到 max_tokens 限制
  3. 上游超时:LLM API 响应慢,连接超时
  4. Tool Call 失败:工具执行抛异常

Spring AI 的错误处理策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
streamingChatModel.stream(prompt)
// 策略 1:遇到错误时返回一个兜底响应
.onErrorResume(e -> {
log.error("流式输出异常", e);
ChatResponse fallback = new ChatResponse(
List.of(new Generation(new AssistantMessage("抱歉,服务暂时不可用,请稍后重试。")))
);
return Flux.just(fallback);
})
// 策略 2:超时控制
.timeout(Duration.ofSeconds(30))
.onErrorResume(TimeoutException.class, e -> {
return Flux.just(new ChatResponse(
List.of(new Generation(new AssistantMessage("响应超时,请重试。")))
);
});

关键原则:流式输出的错误处理要做到优雅降级——即使出错,也要给客户端一个明确的信号(错误事件或兜底文本),而不是突然断开连接让前端不知道发生了什么。

4.2 背压与缓冲:生产者太快怎么办?

当 LLM 的 token 生成速度超过客户端的消费速度时,就会产生背压(Backpressure)。比如:

  • 模型每秒生成 100 个 token
  • 客户端网络慢,每秒只能接收 20 个 token
  • 多余的 80 个 token 会积压在服务端内存中

Spring AI 基于 Reactor,原生支持背压。Flux 的消费者可以通过 request(n) 告诉生产者”我准备好了,给我 n 个元素”。在 SSE 场景中,Spring WebFlux 的 SseEmitter 会自动管理背压。

但在某些场景下,你可能需要手动控制缓冲策略:

1
2
3
4
5
6
7
8
9
10
streamingChatModel.stream(prompt)
// 策略 1:丢弃旧数据(适合实时性要求高的场景)
.onBackpressureDrop(overflowed -> {
log.warn("背压丢弃: {}", overflowed);
})
// 策略 2:缓冲(适合不能丢数据的场景)
.onBackpressureBuffer(1000, BufferOverflowStrategy.DROP_OLDEST)
// 策略 3:批量发送(减少网络请求)
.bufferTimeout(10, Duration.ofMillis(100))
.map(tokens -> String.join("", tokens));

实践建议:大多数场景下,让 Spring WebFlux 自动管理背压即可。只有在高并发场景下(比如一个 Agent 同时服务上百个用户)才需要手动调优。

4.3 并发场景:多个用户同时请求怎么办?

Agent 系统通常是多用户并发的。每个用户的请求都需要一个独立的流式连接。这对服务端的连接管理提出了挑战。

问题:每个 SSE 连接都会占用一个线程(或在 WebFlux 中占用一个事件循环的调度)。如果 100 个用户同时请求,就需要 100 个并发流。

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class StreamingConfig {

@Bean
public WebClient.Builder webClientBuilder() {
return WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.clientConnector(new ReactorClientHttpConnector(
HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofSeconds(60))
.maxConnections(200) // 最大连接数
));
}
}

同时,需要确保 LLM API 的并发限制不会成为瓶颈。OpenAI 的 API 有并发请求限制(Rate Limit),你需要在服务端做请求排队限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Service
public class RateLimitedChatService {

private final Semaphore semaphore = new Semaphore(50); // 最多 50 个并发
private final StreamingChatModel streamingChatModel;

public Flux<ChatResponse> streamWithRateLimit(Prompt prompt) {
return Flux.defer(() -> {
if (semaphore.tryAcquire()) {
return streamingChatModel.stream(prompt)
.doFinally(signal -> semaphore.release());
} else {
return Flux.error(new RuntimeException("请求过于繁忙,请稍后重试"));
}
});
}
}

4.4 生产环境的监控指标

流式输出的监控比同步调用更复杂。你需要关注以下指标:

指标 含义 告警阈值建议
TTFT(Time to First Token) 从请求到收到第一个 token 的时间 > 3s
TPS(Tokens per Second) 每秒生成的 token 数 < 10
流完成率 正常完成的流占比 < 95%
流中断率 中途断开的流占比 > 5%
并发流数 当前活跃的 SSE 连接数 > 100
Tool Call 聚合耗时 从收到第一个 Tool Call chunk 到聚合完成的时间 > 2s
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Component
public class StreamMetrics {

private final MeterRegistry registry;
private final Timer ttftTimer;
private final Counter streamCounter;
private final AtomicInteger activeStreams = new AtomicInteger(0);

public StreamMetrics(MeterRegistry registry) {
this.registry = registry;
this.ttftTimer = Timer.builder("agent.stream.ttft")
.description("Time to first token")
.publishPercentiles(0.5, 0.95, 0.99)
.register(registry);
this.streamCounter = Counter.builder("agent.stream.total")
.description("Total stream requests")
.register(registry);
}

public void recordStreamStart() {
activeStreams.incrementAndGet();
streamCounter.increment();
}

public void recordTTFT(Duration duration) {
ttftTimer.record(duration);
}

public void recordStreamEnd(boolean completed) {
activeStreams.decrementAndGet();
Counter.builder("agent.stream.completion")
.tag("status", completed ? "success" : "interrupted")
.register(registry)
.increment();
}
}

五、实战:完整的流式 Agent 系统

下面给出一个整合了 Memory、Tool Call 和流式输出的完整 Agent 系统示例。

5.1 项目依赖

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

5.2 完整的流式 Agent Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@RestController
@RequestMapping("/api/agent")
public class StreamingAgentController {

private final StreamingChatModel chatModel;
private final ChatMemory chatMemory;
private final WeatherService weatherService;
private final StreamMetrics streamMetrics;

public StreamingAgentController(
StreamingChatModel chatModel,
ChatMemory chatMemory,
WeatherService weatherService,
StreamMetrics streamMetrics) {
this.chatModel = chatModel;
this.chatMemory = chatMemory;
this.weatherService = weatherService;
this.streamMetrics = streamMetrics;
}

/**
* 流式 Agent 对话接口
* 支持记忆、工具调用、流式输出
*/
@GetMapping(value = "/chat/{conversationId}",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamAgentChat(
@PathVariable String conversationId,
@RequestParam String message) {

streamMetrics.recordStreamStart();
Instant startTime = Instant.now();

// 1. 从记忆中获取历史消息
List<Message> history = chatMemory.get(conversationId);

// 2. 构建包含历史和工具的 Prompt
List<Message> messages = new ArrayList<>(history);
messages.add(new UserMessage(message));

Prompt prompt = new Prompt(messages,
OpenAiChatOptions.builder()
.function("getWeather")
.function("searchKnowledge")
.build()
);

// 3. 流式调用
return chatModel.stream(prompt)
.index() // 添加索引,用于判断是否是第一个 token
.map(tuple -> {
long index = tuple.getT1();
ChatResponse response = tuple.getT2();

// 记录 TTFT
if (index == 0) {
streamMetrics.recordTTFT(
Duration.between(startTime, Instant.now())
);
}

String content = response.getResult().getOutput().getText();
return ServerSentEvent.<String>builder()
.id(String.valueOf(index))
.data(content != null ? content : "")
.build();
})
// 发送完成信号
.concatWith(Flux.just(
ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build()
))
// 保存对话到记忆
.doOnComplete(() -> {
chatMemory.add(conversationId, new UserMessage(message));
// 注意:流式场景下,需要在流完成后保存完整的 assistant 回复
// 这里简化处理,实际需要聚合完整响应
streamMetrics.recordStreamEnd(true);
})
// 错误处理
.onErrorResume(e -> {
log.error("流式 Agent 对话异常, conversationId={}", conversationId, e);
streamMetrics.recordStreamEnd(false);
return Flux.just(
ServerSentEvent.<String>builder()
.event("error")
.data("服务异常: " + e.getMessage())
.build()
);
});
}
}

5.3 流式场景下的记忆管理

在流式输出中,保存记忆有一个特殊的挑战:你需要等流完全结束后,才能把完整的 assistant 回复存入记忆

如果在每个 chunk 到达时就保存,记忆中只会存下碎片。如果在流开始时就预分配,又会浪费内存。

最佳实践:用 doOnCompletereduce 操作符聚合完整响应后再保存:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 聚合完整响应后再保存到记忆
AtomicReference<StringBuilder> fullResponse = new AtomicReference<>(new StringBuilder());

chatModel.stream(prompt)
.doOnNext(response -> {
String content = response.getResult().getOutput().getText();
if (content != null) {
fullResponse.get().append(content);
}
})
.doOnComplete(() -> {
// 流结束,保存完整对话
chatMemory.add(conversationId, new UserMessage(message));
chatMemory.add(conversationId,
new AssistantMessage(fullResponse.get().toString()));
});

六、技术的边界与未来

6.1 什么时候不该用流式输出?

流式输出不是万能的。以下场景不适合:

  1. API 集成场景:如果你的 Agent 是被其他服务调用的(而非直接面对用户),流式输出增加了对接方的复杂度,同步调用更简单
  2. 短响应场景:如果 Agent 的回答只有几个字(比如”是”或”否”),流式输出的开销反而更大
  3. 需要完整校验的场景:如果你需要在返回前对 Agent 的输出做安全检查(比如敏感词过滤),流式输出会让校验变得复杂——你需要在流结束后再补一次校验

6.2 未来方向

  1. 多模态流式输出:随着 GPT-4o、Gemini 等多模态模型的普及,流式输出将不只是文本——图片、音频也可能是流式的。想象一下,Agent 先流式输出文字描述,然后流式生成一张图片
  2. 流式 Tool Call 的标准化:目前不同模型提供商对流式 Tool Call 的处理方式不统一,未来可能会有统一的协议标准
  3. WebTransport:作为 HTTP/3 的一部分,WebTransport 提供了比 SSE 更强大的流式传输能力(支持多路复用和可靠/不可靠传输),可能成为下一代 LLM 流式协议

总结

流式响应是 AI Agent 系统中看似简单、实则复杂的一环。它不只是”把响应一个字一个字发出去”,而是涉及协议选择、Tool Call 聚合、错误处理、背压管理、并发控制等一系列工程问题。

核心要点回顾

  1. SSE 是 LLM 流式输出的首选协议——基于 HTTP、自动重连、天然适合单向推送
  2. **Spring AI 用 Flux<ChatResponse>,LangChain4j 用 TokenStream**——前者组合能力强,后者上手简单
  3. 流式 Tool Call 需要聚合——模型会把一个 Tool Call 拆成多个 chunk,你需要在服务端拼回来
  4. 流式错误处理要优雅降级——不能让连接突然断开,要给客户端明确的信号
  5. 记忆保存要在流结束后进行——聚合完整响应后再存入记忆,避免碎片化

如果你正在构建一个面向用户的 AI Agent 产品,流式输出是必须支持的能力。它不是锦上添花,而是用户体验的底线。没有人愿意盯着一个空白页面等 10 秒。