系列文章
本系列致力于用 Java 生态深入理解 AI Agent 的核心机制。以下是完整目录:
- 理解 AI Agent 的大脑:ReAct 模式从入门到实战
- AI Agent 的工具箱:深入理解 Tool Use 与 Spring AI Function Calling 实战
- AI Agent 的记忆力是怎么实现的——LangChain4j Memory 机制深度解析
- AI Agent 的记忆系统:从 ChatMemory 到持久化记忆的 Java 实战
- AI Agent 的灵魂对话:Prompt Engineering 系统提示词设计的艺术与工程
- AI Agent 的规划大脑:从任务分解到自适应执行策略
- MCP 模型上下文协议:AI 的万能接口与 MCP Server 实战
- 让 AI 学会”说人话”——Spring AI 结构化输出实战
- Spring AI 核心架构全解析:从 ChatModel 到 Advisor Chain 的设计哲学
- 从零理解 RAG:检索增强生成完整指南
- Embedding 向量化的魔法:从文本到向量的数学之旅与 Java 实战
- AI Agent 的知识检索引擎:从向量搜索到智能检索策略的 Java 实战
- 当 RAG 遇上知识图谱:GraphRAG 原理与 Java 实战
- 当 RAG 遇到 Agent:Agentic RAG 的架构设计与 Java 实战
- AI Agent 团队协作:多 Agent 系统架构设计与 Java 实战
- AI Agent 的安全防线:Prompt 注入防御与生产级安全防护实战
- AI Agent 的可观测性:从链路追踪到成本监控的 Java 实战
- AI Agent 的评估与优化:从基准测试到生产环境的质量守护实战
- AI Agent 的流式响应与实时交互:从 SSE 到 WebSocket 的 Java 实战(本文)
引言:你有没有想过,为什么 ChatGPT 的回答是”一个字一个字蹦出来”的?
如果你用过 ChatGPT、Claude 或者任何 AI 聊天产品,你一定注意到了一个现象:AI 的回答不是一次性全部展示出来的,而是像打字机一样,一个 token 一个 token 地”蹦”出来。
这不是炫技,也不是为了好看。这是 LLM(大语言模型)的工作方式决定的——模型是逐 token 生成的,每生成一个 token 都需要一次前向推理。如果等全部生成完再返回,用户可能要等 10 秒甚至更久才能看到第一个字。但通过流式输出,用户在 0.5 秒内就能看到第一个 token,感知延迟降低了 20 倍以上。
这就是今天要聊的主题:AI Agent 的流式响应与实时交互。
在之前的系列文章中,我们已经深入理解了 Agent 的记忆系统、工具调用、RAG 检索等核心能力。但有一个问题我们一直没正面回答:Agent 的响应怎么实时传递给用户?
这个问题看似简单,实则涉及一整条技术链路:从 LLM 的 token 流,到后端的流式协议,到前端的实时渲染,再到工具调用时的流式聚合。任何一个环节出问题,用户体验就会断崖式下降。
本文将从底层协议讲起,逐步深入到 Spring AI 和 LangChain4j 的流式 API 实现,最后给出一个完整的生产级流式 Agent 系统。
一、流式响应的技术基础:SSE vs WebSocket
在实现流式输出之前,我们需要先选择一种”管道”来把数据从服务端推送到客户端。主流方案有两种:SSE(Server-Sent Events) 和 WebSocket。
1.1 SSE:单向推送的”收音机”
SSE 是一种基于 HTTP 的单向推送协议。客户端通过普通的 HTTP 请求建立连接,服务端持续向客户端推送数据,直到连接关闭。
它的核心特点是简单:
1 2
| 客户端 --GET /stream--> 服务端 客户端 <--text/event-stream-- 服务端(持续推送)
|
SSE 的数据格式是纯文本,每条消息以 data: 开头,用 \n\n 分隔:
1 2 3 4 5 6 7
| data: {"token": "你"} \n\n data: {"token": "好"} \n\n data: {"token": "啊"} \n\n data: [DONE]
|
为什么 SSE 特别适合 LLM 流式输出?
- 基于 HTTP:不需要额外的协议升级,Nginx、CDN、负载均衡器都能正常代理
- 自动重连:浏览器原生支持
EventSource API,断线后自动重连
- 单向推送:LLM 的流式输出天然是单向的(服务端 → 客户端),不需要双向通信
- 文本友好:LLM 输出的就是文本,SSE 的文本格式完美匹配
1.2 WebSocket:双向通信的”电话”
WebSocket 是一种全双工通信协议,客户端和服务端可以随时互发消息。
1 2 3
| 客户端 --GET /ws (Upgrade)--> 服务端 客户端 <--handshake-- 服务端 客户端 <---> 服务端(双向通信)
|
WebSocket 的特点是双向和二进制支持,但它也有代价:
- 协议升级:需要从 HTTP 升级到 WebSocket,经过代理时可能有问题
- 连接管理:需要自己实现心跳、重连、断线检测
- 状态保持:WebSocket 连接是有状态的,水平扩展时需要 sticky session 或消息总线
1.3 SSE vs WebSocket:LLM 流式输出该选谁?
| 维度 |
SSE |
WebSocket |
| 通信方向 |
单向(服务端→客户端) |
双向 |
| 协议 |
HTTP/1.1 或 HTTP/2 |
独立协议(ws://) |
| 代理兼容性 |
极好(标准 HTTP) |
一般(需要升级支持) |
| 自动重连 |
浏览器原生支持 |
需手动实现 |
| 二进制支持 |
不支持 |
支持 |
| 连接开销 |
低 |
中等 |
| 适用场景 |
LLM 流式输出、通知推送 |
聊天室、游戏、实时协作 |
结论:对于 AI Agent 的流式输出,SSE 是首选方案。原因很简单——LLM 的流式输出是典型的”服务端单向推送文本”场景,SSE 的设计哲学与此完美匹配。只有在需要双向实时交互(比如用户中途打断 Agent、实时协作编辑 Agent 配置)时,才需要考虑 WebSocket。
Spring AI 的官方示例也选择了 SSE 作为流式输出的默认方案。OpenAI、Anthropic、Google 的 API 也都使用 SSE 作为流式传输协议。
二、Spring AI 的流式输出:从接口到源码
2.1 核心接口:StreamingChatModel
Spring AI 为流式输出定义了专门的接口 StreamingChatModel,与同步的 ChatModel 并列:
1 2 3 4 5 6 7 8 9 10 11
| public interface StreamingChatModel extends Model<Prompt, ChatResponse> { ChatResponse call(Prompt prompt); default Flux<ChatResponse> stream(Prompt prompt) { return stream(prompt, null); } Flux<ChatResponse> stream(Prompt prompt, ChatResponse previousChatResponse); }
|
这里有一个设计上的巧思:StreamingChatModel 同时继承了 call() 和 stream() 方法。这意味着一个 Model 实现可以同时支持同步和流式两种调用方式。在 OpenAI 的实现中,OpenAiChatModel 同时实现了 ChatModel 和 StreamingChatModel。
2.2 源码分析:OpenAI 流式调用的内部机制
让我们看看 Spring AI 中 OpenAI 的流式实现。核心逻辑在 OpenAiChatModel 类中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class OpenAiChatModel implements ChatModel, StreamingChatModel { private final OpenAiApi api; @Override public Flux<ChatResponse> stream(Prompt prompt, ChatResponse previousChatResponse) { ChatCompletionRequest request = createRequest(prompt, true); Flux<ChatCompletionChunk> chunkFlux = this.api.chatCompletionStream(request); return chunkFlux.map(this::toChatResponse) .switchOnFirst((signal, flux) -> { }); } }
|
关键步骤拆解:
- **设置
stream=true**:告诉 OpenAI API 返回 SSE 流而非完整 JSON
- 获取 chunk 流:底层 HTTP 客户端(RestClient 或 WebClient)解析 SSE 响应,每个
data: 行解析为一个 ChatCompletionChunk
- 转换为 ChatResponse:将每个 chunk 转换为 Spring AI 统一的
ChatResponse 对象
2.3 实战:构建流式聊天 API
下面是一个完整的流式聊天接口实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @RestController @RequestMapping("/api/chat") public class StreamChatController {
private final StreamingChatModel streamingChatModel;
public StreamChatController(StreamingChatModel streamingChatModel) { this.streamingChatModel = streamingChatModel; }
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<String>> streamChat(@RequestParam String message) { Prompt prompt = new Prompt(new UserMessage(message));
return streamingChatModel.stream(prompt) .map(response -> { String content = response.getResult().getOutput().getText(); return ServerSentEvent.<String>builder() .data(content != null ? content : "") .build(); }) .concatWith(Flux.just( ServerSentEvent.<String>builder() .event("done") .data("[DONE]") .build() )) .onErrorResume(e -> { return Flux.just( ServerSentEvent.<String>builder() .event("error") .data("流式输出异常: " + e.getMessage()) .build() ); }); } }
|
前端消费示例(JavaScript):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| const eventSource = new EventSource('/api/chat/stream?message=你好');
eventSource.onmessage = (event) => { if (event.data === '[DONE]') { eventSource.close(); return; } document.getElementById('response').textContent += event.data; };
eventSource.onerror = (error) => { console.error('SSE 连接错误:', error); eventSource.close(); };
|
在之前的Tool Use 文章中,我们详细讲解了 Function Calling 的原理。但在流式场景下,Tool Call 的处理变得格外复杂。
问题在哪?
同步调用时,模型返回的 Tool Call 是一个完整的 JSON:
1 2 3 4 5 6 7 8
| { "tool_calls": [{ "function": { "name": "getWeather", "arguments": "{\"city\": \"北京\"}" } }] }
|
但在流式调用中,这个 Tool Call 被拆成了多个 chunk:
1 2 3 4
| chunk 1: {"tool_calls": [{"index": 0, "function": {"name": "get"}}]} chunk 2: {"tool_calls": [{"index": 0, "function": {"name": "Weather"}}]} chunk 3: {"tool_calls": [{"index": 0, "function": {"arguments": "{\"ci"}}]} chunk 4: {"tool_calls": [{"index": 0, "function": {"arguments": "ty\": \"北京\"}"}}]}
|
你需要把碎片拼回来,才能得到完整的 Tool Call。
Spring AI 怎么处理的?
Spring AI 在内部维护了一个聚合器(Aggregator),负责将流式 chunk 合并为完整的 ChatResponse。核心逻辑在 ChatCompletionChunkAggregator 中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class ChatCompletionChunkAggregator { private final StringBuilder contentBuilder = new StringBuilder(); private final Map<Integer, ToolCallBuilder> toolCallBuilders = new HashMap<>(); public ChatResponse aggregate(ChatCompletionChunk chunk) { String delta = chunk.getChoices().get(0).getDelta().getContent(); if (delta != null) { contentBuilder.append(delta); } List<ToolCallDelta> toolCalls = chunk.getChoices().get(0).getDelta().getToolCalls(); if (toolCalls != null) { for (ToolCallDelta tc : toolCalls) { toolCallBuilders.computeIfAbsent(tc.getIndex(), k -> new ToolCallBuilder()) .appendName(tc.getFunction().getName()) .appendArguments(tc.getFunction().getArguments()); } } if (chunk.getChoices().get(0).getFinishReason() != null) { return buildFinalResponse(); } return null; } }
|
实际使用时需要注意什么?
如果你想在流式输出中处理 Tool Call,不能简单地逐 chunk 发送给客户端。你需要判断:当前 chunk 是纯文本还是 Tool Call?
- 纯文本 chunk:直接推送给客户端,实时展示
- Tool Call chunk:需要在服务端聚合,执行工具,然后将结果再次送给模型,最后把模型的二次响应流式推送
这是一个”流中断”的过程——流式输出会暂停,等工具执行完再继续。Spring AI 的 stream() 方法在内部自动处理了这个流程,但如果你自己实现 SSE Controller,需要理解这个机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
@GetMapping(value = "/stream-with-tools", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<String>> streamWithTools(@RequestParam String message) { Prompt prompt = new Prompt( new UserMessage(message), OpenAiChatOptions.builder() .function("getWeather") .function("searchDatabase") .build() );
return streamingChatModel.stream(prompt) .map(response -> { String content = response.getResult().getOutput().getText(); if (content != null && !content.isEmpty()) { return ServerSentEvent.<String>builder() .data(content) .build(); } return ServerSentEvent.<String>builder() .event("tool-call") .data("正在调用工具...") .build(); }); }
|
三、LangChain4j 的流式输出:另一种设计哲学
3.1 StreamingChatLanguageModel 与 TokenStream
LangChain4j 的流式 API 设计与 Spring AI 截然不同。Spring AI 用响应式的 Flux<T>,LangChain4j 用回调式的 TokenStream:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public interface StreamingChatLanguageModel extends ChatLanguageModel { void generate(List<ChatMessage> messages, TokenStreamHandler handler); }
public interface TokenStream { TokenStream onPartialResponse(Consumer<String> handler); TokenStream onToolCalled(Consumer<ToolExecution> handler); TokenStream onCompleteResponse(Consumer<ChatResponse> handler); TokenStream onError(ThrowableHandler handler); void start(); }
|
使用方式:
1 2 3 4 5 6 7 8 9 10 11 12 13
| StreamingChatLanguageModel model = OpenAiStreamingChatModel.builder() .apiKey(System.getenv("OPENAI_API_KEY")) .modelName("gpt-4o") .build();
TokenStream tokenStream = model.generate( List.of(new UserMessage("你好")), handler -> handler .onPartialResponse(token -> System.out.print(token)) .onCompleteResponse(response -> System.out.println("\n完成")) .onError(error -> error.printStackTrace()) ); tokenStream.start();
|
3.2 设计哲学对比:Flux vs Callback
| 维度 |
Spring AI (Flux) |
LangChain4j (Callback) |
| 编程模型 |
响应式(Reactive Streams) |
回调式(Callback) |
| 背压支持 |
原生支持 |
不支持 |
| 组合能力 |
可用 map/filter/concat 等操作符链式处理 |
需手动在回调中处理 |
| 学习曲线 |
需要理解 Reactor |
直观,符合传统 Java 思维 |
| 与 Web 框架集成 |
Spring WebFlux 原生支持 |
需要桥接(如 CompletableFuture) |
| 错误处理 |
onErrorResume/onErrorReturn |
onError 回调 |
哪个更好? 没有绝对答案。如果你的项目已经用了 Spring WebFlux,Spring AI 的 Flux 方案更自然;如果你用的是传统的 Spring MVC,LangChain4j 的回调方式更容易上手。
不过,从工程角度看,Flux 的组合能力更强。比如你要实现”先流式输出文本,遇到 Tool Call 时暂停,执行工具后继续流式输出”,用 Flux 可以优雅地用 concatMap 实现,而回调方式需要手动管理状态机。
四、流式响应的进阶话题
4.1 错误处理:流中断了怎么办?
流式输出的最大挑战是错误处理。同步调用时,一个 try-catch 就能捕获异常;但流式调用中,错误可能发生在流的任何位置。
常见错误场景:
- 网络中断:客户端断网、服务端重启
- Token 耗尽:模型输出到一半,达到 max_tokens 限制
- 上游超时:LLM API 响应慢,连接超时
- Tool Call 失败:工具执行抛异常
Spring AI 的错误处理策略:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| streamingChatModel.stream(prompt) .onErrorResume(e -> { log.error("流式输出异常", e); ChatResponse fallback = new ChatResponse( List.of(new Generation(new AssistantMessage("抱歉,服务暂时不可用,请稍后重试。"))) ); return Flux.just(fallback); }) .timeout(Duration.ofSeconds(30)) .onErrorResume(TimeoutException.class, e -> { return Flux.just(new ChatResponse( List.of(new Generation(new AssistantMessage("响应超时,请重试。"))) ); });
|
关键原则:流式输出的错误处理要做到优雅降级——即使出错,也要给客户端一个明确的信号(错误事件或兜底文本),而不是突然断开连接让前端不知道发生了什么。
4.2 背压与缓冲:生产者太快怎么办?
当 LLM 的 token 生成速度超过客户端的消费速度时,就会产生背压(Backpressure)。比如:
- 模型每秒生成 100 个 token
- 客户端网络慢,每秒只能接收 20 个 token
- 多余的 80 个 token 会积压在服务端内存中
Spring AI 基于 Reactor,原生支持背压。Flux 的消费者可以通过 request(n) 告诉生产者”我准备好了,给我 n 个元素”。在 SSE 场景中,Spring WebFlux 的 SseEmitter 会自动管理背压。
但在某些场景下,你可能需要手动控制缓冲策略:
1 2 3 4 5 6 7 8 9 10
| streamingChatModel.stream(prompt) .onBackpressureDrop(overflowed -> { log.warn("背压丢弃: {}", overflowed); }) .onBackpressureBuffer(1000, BufferOverflowStrategy.DROP_OLDEST) .bufferTimeout(10, Duration.ofMillis(100)) .map(tokens -> String.join("", tokens));
|
实践建议:大多数场景下,让 Spring WebFlux 自动管理背压即可。只有在高并发场景下(比如一个 Agent 同时服务上百个用户)才需要手动调优。
4.3 并发场景:多个用户同时请求怎么办?
Agent 系统通常是多用户并发的。每个用户的请求都需要一个独立的流式连接。这对服务端的连接管理提出了挑战。
问题:每个 SSE 连接都会占用一个线程(或在 WebFlux 中占用一个事件循环的调度)。如果 100 个用户同时请求,就需要 100 个并发流。
解决方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Configuration public class StreamingConfig { @Bean public WebClient.Builder webClientBuilder() { return WebClient.builder() .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .clientConnector(new ReactorClientHttpConnector( HttpClient.create() .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .responseTimeout(Duration.ofSeconds(60)) .maxConnections(200) )); } }
|
同时,需要确保 LLM API 的并发限制不会成为瓶颈。OpenAI 的 API 有并发请求限制(Rate Limit),你需要在服务端做请求排队或限流:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Service public class RateLimitedChatService {
private final Semaphore semaphore = new Semaphore(50); private final StreamingChatModel streamingChatModel;
public Flux<ChatResponse> streamWithRateLimit(Prompt prompt) { return Flux.defer(() -> { if (semaphore.tryAcquire()) { return streamingChatModel.stream(prompt) .doFinally(signal -> semaphore.release()); } else { return Flux.error(new RuntimeException("请求过于繁忙,请稍后重试")); } }); } }
|
4.4 生产环境的监控指标
流式输出的监控比同步调用更复杂。你需要关注以下指标:
| 指标 |
含义 |
告警阈值建议 |
| TTFT(Time to First Token) |
从请求到收到第一个 token 的时间 |
> 3s |
| TPS(Tokens per Second) |
每秒生成的 token 数 |
< 10 |
| 流完成率 |
正常完成的流占比 |
< 95% |
| 流中断率 |
中途断开的流占比 |
> 5% |
| 并发流数 |
当前活跃的 SSE 连接数 |
> 100 |
| Tool Call 聚合耗时 |
从收到第一个 Tool Call chunk 到聚合完成的时间 |
> 2s |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Component public class StreamMetrics {
private final MeterRegistry registry; private final Timer ttftTimer; private final Counter streamCounter; private final AtomicInteger activeStreams = new AtomicInteger(0);
public StreamMetrics(MeterRegistry registry) { this.registry = registry; this.ttftTimer = Timer.builder("agent.stream.ttft") .description("Time to first token") .publishPercentiles(0.5, 0.95, 0.99) .register(registry); this.streamCounter = Counter.builder("agent.stream.total") .description("Total stream requests") .register(registry); }
public void recordStreamStart() { activeStreams.incrementAndGet(); streamCounter.increment(); }
public void recordTTFT(Duration duration) { ttftTimer.record(duration); }
public void recordStreamEnd(boolean completed) { activeStreams.decrementAndGet(); Counter.builder("agent.stream.completion") .tag("status", completed ? "success" : "interrupted") .register(registry) .increment(); } }
|
五、实战:完整的流式 Agent 系统
下面给出一个整合了 Memory、Tool Call 和流式输出的完整 Agent 系统示例。
5.1 项目依赖
1 2 3 4 5 6 7 8
| <dependency> <groupId>org.springframework.ai</groupId> <artifactId>spring-ai-openai-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
|
5.2 完整的流式 Agent Controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| @RestController @RequestMapping("/api/agent") public class StreamingAgentController {
private final StreamingChatModel chatModel; private final ChatMemory chatMemory; private final WeatherService weatherService; private final StreamMetrics streamMetrics;
public StreamingAgentController( StreamingChatModel chatModel, ChatMemory chatMemory, WeatherService weatherService, StreamMetrics streamMetrics) { this.chatModel = chatModel; this.chatMemory = chatMemory; this.weatherService = weatherService; this.streamMetrics = streamMetrics; }
@GetMapping(value = "/chat/{conversationId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<String>> streamAgentChat( @PathVariable String conversationId, @RequestParam String message) { streamMetrics.recordStreamStart(); Instant startTime = Instant.now();
List<Message> history = chatMemory.get(conversationId); List<Message> messages = new ArrayList<>(history); messages.add(new UserMessage(message)); Prompt prompt = new Prompt(messages, OpenAiChatOptions.builder() .function("getWeather") .function("searchKnowledge") .build() );
return chatModel.stream(prompt) .index() .map(tuple -> { long index = tuple.getT1(); ChatResponse response = tuple.getT2(); if (index == 0) { streamMetrics.recordTTFT( Duration.between(startTime, Instant.now()) ); } String content = response.getResult().getOutput().getText(); return ServerSentEvent.<String>builder() .id(String.valueOf(index)) .data(content != null ? content : "") .build(); }) .concatWith(Flux.just( ServerSentEvent.<String>builder() .event("done") .data("[DONE]") .build() )) .doOnComplete(() -> { chatMemory.add(conversationId, new UserMessage(message)); streamMetrics.recordStreamEnd(true); }) .onErrorResume(e -> { log.error("流式 Agent 对话异常, conversationId={}", conversationId, e); streamMetrics.recordStreamEnd(false); return Flux.just( ServerSentEvent.<String>builder() .event("error") .data("服务异常: " + e.getMessage()) .build() ); }); } }
|
5.3 流式场景下的记忆管理
在流式输出中,保存记忆有一个特殊的挑战:你需要等流完全结束后,才能把完整的 assistant 回复存入记忆。
如果在每个 chunk 到达时就保存,记忆中只会存下碎片。如果在流开始时就预分配,又会浪费内存。
最佳实践:用 doOnComplete 或 reduce 操作符聚合完整响应后再保存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| AtomicReference<StringBuilder> fullResponse = new AtomicReference<>(new StringBuilder());
chatModel.stream(prompt) .doOnNext(response -> { String content = response.getResult().getOutput().getText(); if (content != null) { fullResponse.get().append(content); } }) .doOnComplete(() -> { chatMemory.add(conversationId, new UserMessage(message)); chatMemory.add(conversationId, new AssistantMessage(fullResponse.get().toString())); });
|
六、技术的边界与未来
6.1 什么时候不该用流式输出?
流式输出不是万能的。以下场景不适合:
- API 集成场景:如果你的 Agent 是被其他服务调用的(而非直接面对用户),流式输出增加了对接方的复杂度,同步调用更简单
- 短响应场景:如果 Agent 的回答只有几个字(比如”是”或”否”),流式输出的开销反而更大
- 需要完整校验的场景:如果你需要在返回前对 Agent 的输出做安全检查(比如敏感词过滤),流式输出会让校验变得复杂——你需要在流结束后再补一次校验
6.2 未来方向
- 多模态流式输出:随着 GPT-4o、Gemini 等多模态模型的普及,流式输出将不只是文本——图片、音频也可能是流式的。想象一下,Agent 先流式输出文字描述,然后流式生成一张图片
- 流式 Tool Call 的标准化:目前不同模型提供商对流式 Tool Call 的处理方式不统一,未来可能会有统一的协议标准
- WebTransport:作为 HTTP/3 的一部分,WebTransport 提供了比 SSE 更强大的流式传输能力(支持多路复用和可靠/不可靠传输),可能成为下一代 LLM 流式协议
总结
流式响应是 AI Agent 系统中看似简单、实则复杂的一环。它不只是”把响应一个字一个字发出去”,而是涉及协议选择、Tool Call 聚合、错误处理、背压管理、并发控制等一系列工程问题。
核心要点回顾:
- SSE 是 LLM 流式输出的首选协议——基于 HTTP、自动重连、天然适合单向推送
- **Spring AI 用
Flux<ChatResponse>,LangChain4j 用 TokenStream**——前者组合能力强,后者上手简单
- 流式 Tool Call 需要聚合——模型会把一个 Tool Call 拆成多个 chunk,你需要在服务端拼回来
- 流式错误处理要优雅降级——不能让连接突然断开,要给客户端明确的信号
- 记忆保存要在流结束后进行——聚合完整响应后再存入记忆,避免碎片化
如果你正在构建一个面向用户的 AI Agent 产品,流式输出是必须支持的能力。它不是锦上添花,而是用户体验的底线。没有人愿意盯着一个空白页面等 10 秒。