系列文章
本系列持续更新,以下是已发布文章索引:
理解 AI Agent 的大脑:ReAct 模式从入门到实战
AI Agent 的灵魂对话:Prompt Engineering 系统提示词设计的艺术与工程
AI Agent 的规划大脑:从任务分解到自适应执行策略
AI Agent 的工具箱:深入理解 Tool Use 与 Spring AI Function Calling 实战
AI Agent 的记忆系统:从 ChatMemory 到持久化记忆的 Java 实战
AI Agent 的记忆力是怎么实现的——LangChain4j Memory 机制深度解析
MCP 模型上下文协议:AI 的万能接口与 MCP Server 实战
让 AI 学会”说人话”——Spring AI 结构化输出实战
从零理解 RAG:检索增强生成完整指南
Embedding 向量化的魔法:从文本到向量的数学之旅与 Java 实战
AI Agent 的知识检索引擎:从向量搜索到智能检索策略的 Java 实战
当 RAG 遇到 Agent:Agentic RAG 的架构设计与 Java 实战
当 RAG 遇上知识图谱:GraphRAG 原理与 Java 实战
AI Agent 团队协作:多 Agent 系统架构设计与 Java 实战
AI Agent 的评估与优化:从基准测试到生产环境的质量守护实战
AI Agent 的可观测性:从链路追踪到成本监控的 Java 实战
AI Agent 的安全防线:Prompt 注入防御与生产级安全防护实战
AI Agent 的流式响应与实时交互:从 SSE 到 WebSocket 的 Java 实战
Spring AI 核心架构全解析:从 ChatModel 到 Advisor Chain 的设计哲学
AI Agent 的工作流编排:从顺序链到自适应 DAG 的 Java 实战 (本文)
引言:当 ReAct 不够用的时候 在本系列的早期文章中,我们详细探讨了 ReAct 模式 ——Agent 的”思考-行动-观察”循环。ReAct 赋予了 Agent 自主决策的能力,让它可以根据环境反馈动态调整策略。但在真实的生产环境中,你会发现一个尴尬的事实:纯粹的 ReAct 循环,就像给一个实习生完全的自主权——大多数时候他能搞定,但你永远不知道他下一步会做什么 。
想象一下这个场景:你让 Agent 做一次”竞品分析报告”。ReAct 模式下,Agent 可能会:
第 1 步:搜索竞品 A 的信息 ✓
第 2 步:突然决定先去查一下行业趋势(虽然你没要求)
第 3 步:查到一半又想去分析用户评价
第 4 步:Token 快用完了,草草收尾
整个过程不可预测、不可复现、不可审计。在企业级应用中,我们需要的是”可编排的智能”——既保留 LLM 的推理能力,又让执行流程可控、可观测、可复现。
这就是今天要聊的主题:AI Agent 工作流编排 。
本文会带你:
理解为什么需要从”自由推理”走向”结构化编排”
掌握五种核心工作流模式及其适用场景
基于 Spring AI 从零构建一个生产级工作流引擎
对比 LangGraph 的设计哲学,理解不同技术路线的取舍
在真实企业场景中落地这些模式
一、从 ReAct 到 Workflow:一个认知升级 1.1 自由推理 vs 结构化编排 让我们先厘清一个关键区别:
维度
自由推理(ReAct)
结构化编排(Workflow)
控制权
Agent 自主决定每一步
开发者定义流程骨架
可预测性
低——每次执行路径不同
高——流程确定,细节由 LLM 填充
可复现性
差——同样的输入可能走不同路径
好——相同输入产生相同流程
灵活性
极高——能处理任意任务
中等——受限于预定义流程
调试难度
高——需要追踪完整推理链
低——每步独立,可单独测试
Token 消耗
不可控——可能无限循环
可控——每步有明确的 token 预算
适用场景
探索性任务、开放式问题
流程确定的企业级任务
关键洞察 :这不是”二选一”的问题。最佳实践是在 Workflow 的骨架中嵌入 ReAct 的灵活性 ——用 Workflow 定义”做什么”,用 ReAct 决定”怎么做”。
1.2 一个类比:交响乐团 vs 即兴爵士 把 ReAct 想象成爵士乐即兴演奏——乐手(Agent)根据现场氛围即兴发挥,充满创造力但每次演出都不同。
而 Workflow 编排就像交响乐团——有乐谱(流程定义),有指挥(编排引擎),每个乐手(步骤)知道自己的部分,整体协调一致,每次演出质量稳定。
生产环境需要的是”有乐谱的爵士”——框架是确定的,但每个段落内部允许即兴。
二、五种核心工作流模式 掌握了基本概念,让我们深入五种最常用的工作流模式。每种模式我都会给出:设计意图、结构图、适用场景、以及 Java 实现。
2.1 顺序链(Sequential Chain) 设计意图 :最简单的编排模式——A → B → C,前一步的输出是后一步的输入。
类比 :工厂流水线。每个工位完成一道工序,半成品依次传递。
适用场景 :
文档翻译 + 润色 + 格式化
数据采集 → 清洗 → 分析 → 报告
需求分析 → 方案设计 → 代码生成 → 代码审查
Spring AI 实现 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public class SequentialChain { private final List<ChainStep> steps; private final ChatClient chatClient; public SequentialChain (ChatClient chatClient, List<ChainStep> steps) { this .chatClient = chatClient; this .steps = steps; } public ChainResult execute (String input) { ChainResult result = new ChainResult (); String current = input; for (int i = 0 ; i < steps.size(); i++) { ChainStep step = steps.get(i); long start = System.currentTimeMillis(); String prompt = step.getPromptTemplate() .replace("{input}" , current); String output = chatClient.prompt() .user(prompt) .system(step.getSystemPrompt()) .call() .content(); long duration = System.currentTimeMillis() - start; result.addStep(step.getName(), current, output, duration); current = output; } result.setFinalOutput(current); return result; } } public class ChainStep { private final String name; private final String systemPrompt; private final String promptTemplate; private final Map<String, Object> metadata; } public class ChainResult { private String finalOutput; private final List<StepResult> stepResults = new ArrayList <>(); private long totalDuration; public void addStep (String name, String input, String output, long duration) { stepResults.add(new StepResult (name, input, output, duration)); } }
使用示例——竞品分析流水线 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Bean public SequentialChain competitiveAnalysisChain (ChatClient.Builder builder) { ChatClient client = builder.build(); List<ChainStep> steps = List.of( new ChainStep .Builder("采集信息" ) .systemPrompt("你是一个市场研究员。请从以下需求中提取关键信息。" ) .promptTemplate("分析以下竞品的核心信息:{input}" ) .build(), new ChainStep .Builder("对比分析" ) .systemPrompt("你是一个产品分析师。请从功能、价格、用户体验三个维度进行对比。" ) .promptTemplate("基于以下信息进行竞品对比分析:{input}" ) .build(), new ChainStep .Builder("生成报告" ) .systemPrompt("你是一个商业报告撰写专家。请用结构化的方式输出报告。" ) .promptTemplate("将以下分析结果整理为正式的竞品分析报告:{input}" ) .build() ); return new SequentialChain (client, steps); }
顺序链的陷阱 :
错误传播 :第 2 步如果理解错误,第 3 步会在错误基础上继续——误差会放大
Token 浪费 :每步都完整调用 LLM,对于简单转换可能是浪费
优化建议 :在关键步骤后加”校验节点”——用另一个 LLM 调用检查上一步的输出是否合理
2.2 并行扇出(Parallel Fan-Out) 设计意图 :多个独立任务同时执行,最后汇总结果。
类比 :餐厅后厨——多个厨师同时准备不同的菜,最后由传菜员统一上桌。
适用场景 :
多维度分析(情感分析 + 关键词提取 + 摘要同时进行)
多语言翻译(中英日三语同时翻译)
多数据源聚合(同时查询多个 API,汇总结果)
Spring AI 实现 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class ParallelFanOut { private final ChatClient chatClient; private final ExecutorService executor; public ParallelFanOut (ChatClient chatClient) { this .executor = Executors.newVirtualThreadPerTaskExecutor(); this .chatClient = chatClient; } public FanOutResult execute (String input, List<FanBranch> branches, long timeoutMs) { FanOutResult result = new FanOutResult (); List<Future<BranchResult>> futures = branches.stream() .map(branch -> executor.submit(() -> executeBranch(branch, input))) .toList(); for (int i = 0 ; i < futures.size(); i++) { try { BranchResult branchResult = futures.get(i) .get(timeoutMs, TimeUnit.MILLISECONDS); result.addSuccess(branches.get(i).getName(), branchResult); } catch (TimeoutException e) { result.addTimeout(branches.get(i).getName()); } catch (Exception e) { result.addFailure(branches.get(i).getName(), e.getMessage()); } } return result; } private BranchResult executeBranch (FanBranch branch, String input) { long start = System.currentTimeMillis(); String output = chatClient.prompt() .user(branch.getPromptTemplate().replace("{input}" , input)) .system(branch.getSystemPrompt()) .call() .content(); return new BranchResult (output, System.currentTimeMillis() - start); } } public class FanBranch { private final String name; private final String systemPrompt; private final String promptTemplate; private final boolean required; }
使用示例——多维度内容分析 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Bean public ParallelFanOut contentAnalysisChain (ChatClient.Builder builder) { ChatClient client = builder.build(); List<FanBranch> branches = List.of( new FanBranch .Builder("情感分析" ) .systemPrompt("分析文本的情感倾向,输出 JSON:{\"sentiment\": \"positive/negative/neutral\", \"score\": 0.0-1.0}" ) .promptTemplate("分析以下文本的情感:{input}" ) .required(true ) .build(), new FanBranch .Builder("关键词提取" ) .systemPrompt("提取文本中的核心关键词,输出 JSON 数组" ) .promptTemplate("提取以下文本的关键词:{input}" ) .required(true ) .build(), new FanBranch .Builder("摘要生成" ) .systemPrompt("用一句话总结文本核心内容" ) .promptTemplate("总结以下文本:{input}" ) .required(false ) .build(), new FanBranch .Builder("分类标签" ) .systemPrompt("为文本打上分类标签,从以下类别中选择:技术、商业、产品、市场、其他" ) .promptTemplate("分类以下文本:{input}" ) .required(false ) .build() ); return new ParallelFanOut (client); }
并行扇出的关键设计决策 :
超时策略 :不是所有分支都同等重要。required=true 的分支超时应该重试,required=false 的直接跳过
结果合并 :多个分支的输出格式可能不同——需要一个统一的 MergeStrategy
成本控制 :并行意味着同时发起多个 LLM 调用,Token 成本翻倍。只在真正需要并行时才用
2.3 路由分发(Router) 设计意图 :根据输入内容,动态选择不同的处理路径。
类比 :医院分诊台——根据症状把病人分到不同科室。
适用场景 :
客服系统:根据问题类型路由到不同的处理流程
代码生成:根据语言选择不同的模板和规范
内容审核:不同类型内容走不同的审核策略
Spring AI 实现 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class LlmRouter { private final ChatClient chatClient; private final Map<String, RouteHandler> routes; private final RouteHandler defaultRoute; public LlmRouter (ChatClient chatClient, Map<String, RouteHandler> routes, RouteHandler defaultRoute) { this .chatClient = chatClient; this .routes = routes; this .defaultRoute = defaultRoute; } public RouteResult route (String input) { String classification = chatClient.prompt() .system(""" 你是一个意图分类器。根据用户输入,返回以下类别之一: %s 只返回类别名称,不要其他内容。 """ .formatted(String.join(", " , routes.keySet()))) .user(input) .call() .content() .trim(); RouteHandler handler = routes.getOrDefault(classification, defaultRoute); if (handler == null ) { return RouteResult.error("未找到路由: " + classification); } long start = System.currentTimeMillis(); String output = handler.handle(input, chatClient); long duration = System.currentTimeMillis() - start; return new RouteResult (classification, output, duration); } } @FunctionalInterface public interface RouteHandler { String handle (String input, ChatClient chatClient) ; }
使用示例——智能客服路由 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Bean public LlmRouter customerServiceRouter (ChatClient.Builder builder) { ChatClient client = builder.build(); Map<String, RouteHandler> routes = Map.of( "技术问题" , (input, chat) -> chat.prompt() .system("你是一个技术支持工程师。请根据问题提供解决方案或建议联系人工技术支持。" ) .user(input) .call() .content(), "账务问题" , (input, chat) -> chat.prompt() .system("你是一个账务专员。请解答账务相关问题,涉及金额操作必须建议联系人工客服。" ) .user(input) .call() .content(), "投诉建议" , (input, chat) -> chat.prompt() .system("你是一个客户关怀专员。请认真倾听客户诉求,表达同理心,并记录问题转交相关部门。" ) .user(input) .call() .content(), "产品咨询" , (input, chat) -> { return chat.prompt() .system("你是一个产品顾问。请基于产品知识库回答客户咨询。" ) .user(input) .call() .content(); } ); RouteHandler defaultRoute = (input, chat) -> "抱歉,我暂时无法处理您的问题。正在为您转接人工客服,请稍候..." ; return new LlmRouter (client, routes, defaultRoute); }
路由的进阶技巧 :
多级路由 :第一级分大类,第二级分小类——像医院的”内科 → 心内科”
置信度阈值 :如果 LLM 分类置信度低(可以用 few-shot 让 LLM 返回 confidence),低于阈值时直接转人工
路由缓存 :相同类型的输入不需要每次都重新分类——用 embedding 相似度做快速路由
2.4 编排者-工作者(Orchestrator-Worker) 设计意图 :一个”编排者”Agent 动态生成任务计划,多个”工作者”Agent 并行执行子任务,最后编排者汇总结果。
类比 :项目经理 + 开发团队。PM 拆解需求、分配任务、跟踪进度、最终整合交付物。
这是最强大也最复杂的模式,它融合了我们在规划大脑 和多 Agent 协作 中讨论的概念。
适用场景 :
复杂研究报告生成(需要多个维度的独立研究)
大型代码重构(拆解为多个独立模块并行修改)
多语言内容本地化(每种语言独立处理)
Spring AI 实现 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 public class OrchestratorWorkerEngine { private final ChatClient orchestratorClient; private final ChatClient workerClient; private final ExecutorService executor; private final int maxWorkers; public OrchestratorWorkerEngine (ChatClient.Builder builder, int maxWorkers) { this .orchestratorClient = builder.build(); this .workerClient = builder.build(); this .executor = Executors.newVirtualThreadPerTaskExecutor(); this .maxWorkers = maxWorkers; } public OrchestratorResult execute (String task) { TaskPlan plan = generatePlan(task); List<WorkerResult> results = executeWorkers(plan.getSubTasks()); String finalOutput = synthesizeResults(task, results); return new OrchestratorResult (plan, results, finalOutput); } private TaskPlan generatePlan (String task) { return orchestratorClient.prompt() .system(""" 你是一个任务规划专家。根据用户的任务,将其拆解为可独立执行的子任务。 规则: 1. 每个子任务必须是独立的,不依赖其他子任务的输出 2. 子任务数量不超过 %d 个 3. 每个子任务有明确的输入和预期输出格式 4. 按优先级排序(并行执行时高优先级先开始) """ .formatted(maxWorkers)) .user(task) .call() .entity(TaskPlan.class); } private List<WorkerResult> executeWorkers (List<SubTask> subTasks) { List<Future<WorkerResult>> futures = subTasks.stream() .map(task -> executor.submit(() -> { int retries = task.isCritical() ? 2 : 0 ; return executeWithRetry(task, retries); })) .toList(); return futures.stream() .map(f -> { try { return f.get(60 , TimeUnit.SECONDS); } catch (Exception e) { return WorkerResult.failed("timeout" , e.getMessage()); } }) .toList(); } private WorkerResult executeWithRetry (SubTask task, int retries) { for (int attempt = 0 ; attempt <= retries; attempt++) { try { long start = System.currentTimeMillis(); String output = workerClient.prompt() .system(task.getSystemPrompt()) .user(task.getInput()) .call() .content(); return WorkerResult.success( task.getId(), output, System.currentTimeMillis() - start, attempt ); } catch (Exception e) { if (attempt == retries) { return WorkerResult.failed(task.getId(), e.getMessage()); } } } return WorkerResult.failed(task.getId(), "max retries exceeded" ); } private String synthesizeResults (String originalTask, List<WorkerResult> results) { String resultsText = results.stream() .filter(WorkerResult::isSuccess) .map(r -> "【" + r.getTaskId() + "】\n" + r.getOutput()) .collect(Collectors.joining("\n\n" )); return orchestratorClient.prompt() .system("你是一个报告整合专家。请将多个子任务的结果整合为一份连贯的最终输出。" ) .user("原始任务:%s\n\n各子任务结果:\n%s" .formatted(originalTask, resultsText)) .call() .content(); } } @JsonClassDescription("任务执行计划") public record TaskPlan ( @JsonPropertyDescription("任务总体目标的简要描述") String objective, @JsonPropertyDescription("拆解后的子任务列表") List<SubTask> subTasks ) {} @JsonClassDescription("子任务定义") public record SubTask ( @JsonPropertyDescription("子任务唯一 ID") String id, @JsonPropertyDescription("子任务描述") String description, @JsonPropertyDescription("系统提示词") String systemPrompt, @JsonPropertyDescription("输入内容") String input, @JsonPropertyDescription("是否是关键任务(失败则整体失败)") boolean critical, @JsonPropertyDescription("优先级,数字越小越优先") int priority ) {}
编排者-工作者模式的核心问题:如何让编排者”看懂”工作者的结果?
这其实是一个被很多人忽略的问题。工作者可能返回格式不一致、信息不完整、甚至自相矛盾的内容。编排者需要:
验证工作者输出 :检查是否符合预期格式
冲突检测 :如果两个工作者的结论矛盾,需要决策
质量评估 :评估每个工作者输出的质量,低质量的需要重做
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class ResultQualityEvaluator { private final ChatClient chatClient; public QualityScore evaluate (String task, String output) { return chatClient.prompt() .system(""" 你是一个质量评估专家。请评估以下任务输出的质量。 评分维度(每项 1-10 分): 1. 完整性:是否完整回答了任务要求 2. 准确性:信息是否准确,有无明显错误 3. 连贯性:逻辑是否通顺,表述是否清晰 返回 JSON 格式的评分结果。 """ ) .user("任务描述:%s\n\n任务输出:\n%s" .formatted(task, output)) .call() .entity(QualityScore.class); } }
2.5 人机协作(Human-in-the-Loop) 设计意图 :在关键决策点暂停执行,等待人类审核或输入,然后继续。
类比 :自动驾驶的 L2 级别——大部分时间自动行驶,但关键时刻会提醒驾驶员接管。
适用场景 :
内容发布前的人工审核
大额交易的确认
法律文件的最终审批
高风险操作的二次确认
Spring AI 实现 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 public class HumanInLoopEngine { private final ChatClient chatClient; private final CheckpointStore checkpointStore; public ExecutionState execute (String workflowId, List<WorkflowStep> steps, String input) { ExecutionState state = checkpointStore.load(workflowId) .orElse(new ExecutionState (workflowId, input)); for (int i = state.getCurrentStepIndex(); i < steps.size(); i++) { WorkflowStep step = steps.get(i); if (step.requiresHumanReview()) { String draft = executeStep(step, state.getCurrentInput()); state.pauseAt(i, draft, step.getReviewPrompt()); checkpointStore.save(state); return state; } String output = executeStep(step, state.getCurrentInput()); state.advance(i + 1 , output); } state.markComplete(); checkpointStore.save(state); return state; } public ExecutionState resume (String workflowId, HumanFeedback feedback) { ExecutionState state = checkpointStore.load(workflowId) .orElseThrow(() -> new IllegalStateException ("流程不存在: " + workflowId)); String enrichedInput = """ 原始输出: %s 人类审核意见: %s """ .formatted(state.getPausedOutput(), feedback.getComments()); if (feedback.isApproved()) { state.advance(state.getCurrentStepIndex() + 1 , state.getPausedOutput()); } else if (feedback.hasModifications()) { String revised = executeStep( state.getCurrentStep(), enrichedInput ); state.advance(state.getCurrentStepIndex() + 1 , revised); } else { state.markRejected(feedback.getReason()); checkpointStore.save(state); return state; } return execute(state.getWorkflowId(), state.getRemainingSteps(), state.getCurrentInput()); } }
人机协作的工程挑战 :
状态持久化 :人类审核可能需要几小时甚至几天。必须将中间状态持久化到数据库,而不是存在内存里。
超时处理 :人类可能永远不审核。需要设置超时策略——超时后自动通过、自动拒绝、或升级给更高级别的人。
并发控制 :同一流程可能被多个人同时审核——需要乐观锁或悲观锁。
审核界面 :纯 CLI 不够——需要一个 Web 界面让审核者看到 LLM 的输出、上下文、以及操作按钮(批准/修改/拒绝)。
三、从零构建工作流引擎 理解了五种模式后,让我们把它们整合成一个统一的工作流引擎。这不是一个玩具 demo——而是可以直接用在生产环境的设计。
3.1 核心抽象:WorkflowDefinition 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class WorkflowDefinition { private final String name; private final List<WorkflowNode> nodes; private final Map<String, Transition> transitions; public static class WorkflowNode { private final String id; private final NodeType type; private final String prompt; private final String systemPrompt; private final boolean requiresReview; private final RetryPolicy retryPolicy; private final Duration timeout; private final Map<String, Object> metadata; } public static class Transition { private final String fromNodeId; private final String defaultNextNodeId; private final List<Condition> conditions; public record Condition ( String expression, // 简单的条件表达式 String nextNodeId // 条件满足时跳转到的节点 ) {} } }
3.2 执行引擎:WorkflowEngine 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 @Service public class WorkflowEngine { private final ChatClient chatClient; private final WorkflowStateRepository stateRepository; private final ApplicationEventPublisher eventPublisher; public WorkflowExecution run (WorkflowDefinition definition, Map<String, Object> inputs) { WorkflowExecution execution = WorkflowExecution.start(definition, inputs); while (execution.hasNextNode()) { WorkflowNode currentNode = execution.getCurrentNode(); eventPublisher.publishEvent(new NodeStartedEvent ( execution.getId(), currentNode.getId() )); try { NodeResult result = switch (currentNode.getType()) { case LLM -> executeLlmNode(currentNode, execution.getContext()); case HUMAN -> pauseForHuman(execution, currentNode); case API -> executeApiNode(currentNode, execution.getContext()); case PARALLEL_GROUP -> executeParallelGroup( currentNode, execution.getContext() ); }; execution.recordResult(currentNode.getId(), result); String nextNodeId = resolveTransition( definition.getTransitions().get(currentNode.getId()), result ); execution.moveToNext(nextNodeId); } catch (Exception e) { if (shouldRetry(currentNode, execution)) { execution.retryCurrentNode(); continue ; } execution.markFailed(currentNode.getId(), e); break ; } } stateRepository.save(execution); return execution; } private NodeResult executeLlmNode (WorkflowNode node, ExecutionContext context) { long start = System.currentTimeMillis(); String resolvedPrompt = resolveTemplate(node.getPrompt(), context); String resolvedSystem = resolveTemplate(node.getSystemPrompt(), context); String output = chatClient.prompt() .system(resolvedSystem) .user(resolvedPrompt) .call() .content(); return new NodeResult ( output, Map.of("duration_ms" , System.currentTimeMillis() - start, "token_count" , estimateTokens(resolvedPrompt + output)) ); } private NodeResult pauseForHuman (WorkflowExecution execution, WorkflowNode node) { execution.pause(node.getId()); stateRepository.save(execution); throw new WorkflowPausedException (execution.getId(), node.getId()); } private String resolveTransition (Transition transition, NodeResult result) { if (transition == null ) return null ; if (transition.getConditions().isEmpty()) { return transition.getDefaultNextNodeId(); } for (Transition.Condition condition : transition.getConditions()) { if (evaluateCondition(condition.expression(), result)) { return condition.nextNodeId(); } } return transition.getDefaultNextNodeId(); } }
3.3 使用示例——构建一个完整的内容发布流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Configuration public class ContentPublishWorkflow { @Bean public WorkflowDefinition contentPublishWorkflow () { return WorkflowDefinition.builder() .name("内容发布流程" ) .node(WorkflowNode.builder() .id("draft" ) .type(NodeType.LLM) .systemPrompt("你是一个技术博客作者,请根据主题撰写一篇技术文章。" ) .prompt("主题:{{topic}}\n\n要求:{{requirements}}" ) .timeout(Duration.ofMinutes(5 )) .build()) .node(WorkflowNode.builder() .id("auto-review" ) .type(NodeType.LLM) .systemPrompt(""" 你是一个文章审查专家。请检查以下方面: 1. 格式是否规范(标题层级、代码块、列表) 2. SEO 元素是否完整(标题、摘要、关键词) 3. 内部链接是否使用了正确的格式(/posts/abbrlink.html) 4. 文字和代码的一致性 返回 JSON:{"passed": boolean, "issues": [...]} """ ) .prompt("请审查以下文章:\n{{draft}}" ) .build()) .node(WorkflowNode.builder() .id("human-review" ) .type(NodeType.HUMAN) .prompt("请审核以下文章(自动审查发现了问题):\n{{draft}}\n\n问题列表:{{auto-review}}" ) .requiresReview(true ) .build()) .node(WorkflowNode.builder() .id("publish" ) .type(NodeType.API) .prompt("发布文章:{{final_article}}" ) .build()) .transition("draft" , "auto-review" ) .transition("auto-review" , Transition.builder() .defaultNext("publish" ) .condition("$.passed == false" , "human-review" ) .build()) .transition("human-review" , "publish" ) .build(); } }
四、横向对比:Spring AI vs LangGraph 在 Python 生态中,LangGraph 是 LangChain 团队推出的图编排框架,专门解决 Agent 工作流问题。让我们做一个深入对比,帮助你做出技术选型。
4.1 架构哲学对比
维度
LangGraph(Python)
Spring AI(Java)
核心抽象
有向图(StateGraph)
Advisor Chain + 自定义编排
状态管理
内置 State 对象,自动在节点间传递
需要自己设计 Context 对象
检查点
内置 Checkpointer(SQLite/Postgres)
需要自己实现状态持久化
人机协作
内置 interrupt_before/interrupt_after
需要自己设计暂停/恢复机制
并行执行
基于 asyncio,原生支持
基于虚拟线程(Java 21+),性能更好
流式输出
原生支持 node 级流式
通过 SSE/WebSocket 实现(见本系列流式响应篇 )
生态集成
LangChain 全家桶
Spring 生态(Spring Boot、Spring Cloud)
类型安全
弱(Python 动态类型)
强(Java 编译时检查)
生产部署
需要额外的 FastAPI/Flask
Spring Boot 开箱即用
4.2 LangGraph 的图编排示例 为了让你直观理解差异,这里给出 LangGraph 的等价实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 from langgraph.graph import StateGraph, ENDclass WorkflowState (TypedDict ): topic: str draft: str review_result: dict final_article: str def draft_node (state: WorkflowState ) -> dict : """生成初稿""" response = llm.invoke(f"写一篇关于 {state['topic' ]} 的文章" ) return {"draft" : response.content} def review_node (state: WorkflowState ) -> dict : """自动审查""" response = llm.invoke(f"审查文章:{state['draft' ]} " ) return {"review_result" : json.loads(response.content)} def human_review_node (state: WorkflowState ) -> dict : """人工审核——这里会暂停,等待外部输入""" pass def publish_node (state: WorkflowState ) -> dict : """发布""" publish_article(state.get("final_article" ) or state["draft" ]) return {} workflow = StateGraph(WorkflowState) workflow.add_node("draft" , draft_node) workflow.add_node("review" , review_node) workflow.add_node("human_review" , human_review_node) workflow.add_node("publish" , publish_node) workflow.set_entry_point("draft" ) workflow.add_edge("draft" , "review" ) workflow.add_conditional_edges( "review" , lambda state: "human_review" if not state["review_result" ]["passed" ] else "publish" , {"human_review" : "human_review" , "publish" : "publish" } ) workflow.add_edge("human_review" , "publish" ) workflow.add_edge("publish" , END) app = workflow.compile (checkpointer=SqliteSaver.from_conn_string(":memory:" )) config = {"configurable" : {"thread_id" : "1" }} result = app.invoke({"topic" : "AI Agent 工作流" }, config)
4.3 选型建议 选 LangGraph 当 :
团队是 Python 技术栈
需要快速原型验证
需要复杂的图结构(环、子图嵌套)
已经深度使用 LangChain 生态
选 Spring AI + 自定义编排当 :
团队是 Java 技术栈(这是最大的决定因素)
需要和现有的 Spring Boot 微服务集成
对类型安全和编译时检查有要求
需要利用 Java 21 虚拟线程的高并发优势
生产环境需要成熟的监控、配置管理、服务发现等 Spring 生态能力
我的建议 :不要为了”图编排”的噱头强行引入新语言栈。如果你的团队是 Java,用 Spring AI + 自定义编排引擎完全够用。LangGraph 的核心优势(状态管理、检查点)用 Java 代码实现并不复杂——本文已经给出了完整的实现。
五、生产环境最佳实践 5.1 可观测性集成 工作流引擎必须和可观测性体系打通(详见本系列可观测性篇 )。每个节点执行时自动记录:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Component public class WorkflowTracingInterceptor { private final MeterRegistry meterRegistry; public <T> T intercept (String workflowId, String nodeId, Supplier<T> action) { Span span = Span.current(); Timer.Sample sample = Timer.start(meterRegistry); try { T result = action.get(); sample.stop(Timer.builder("workflow.node.duration" ) .tag("workflow" , workflowId) .tag("node" , nodeId) .tag("status" , "success" ) .register(meterRegistry)); return result; } catch (Exception e) { sample.stop(Timer.builder("workflow.node.duration" ) .tag("workflow" , workflowId) .tag("node" , nodeId) .tag("status" , "error" ) .register(meterRegistry)); throw e; } } }
5.2 成本控制 工作流中多个 LLM 调用的 Token 成本会快速累积。关键策略:
节点级 Token 预算 :每个节点设置最大 Token 限制,超出时截断或降级
模型分级 :简单节点用小模型(GPT-3.5 / 本地模型),关键节点用强模型(GPT-4)
缓存 :相同输入的 LLM 调用结果缓存(特别是路由分类、格式检查等确定性较高的任务)
并行成本可见 :并行扇出的成本 = 分支数 × 单次调用成本,必须在设计时就评估
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class CostAwareExecutor { private static final int DEFAULT_TOKEN_BUDGET = 10000 ; public NodeResult executeWithBudget (WorkflowNode node, ExecutionContext context, int remainingBudget) { if (remainingBudget <= 0 ) { return fallbackExecute(node, context); } int maxTokens = Math.min(remainingBudget, node.getMaxTokens()); return executeLlmNode(node, context, maxTokens); } }
5.3 错误处理策略 工作流中的错误处理比单次 LLM 调用复杂得多——你需要考虑:
错误类型
策略
实现
LLM API 超时
指数退避重试(最多 3 次)
RetryPolicy + CircuitBreaker
LLM 输出格式错误
重新生成(提示中强调格式要求)
结构化输出 + 校验重试
并行分支部分失败
容忍(标记降级,继续执行)
FanOutResult.partialSuccess
人工审核超时
升级(通知更高级别审核者)
ScheduledTask + 通知服务
总流程超时
终止(返回已完成的部分结果)
WorkflowExecution.timeout
5.4 测试策略 工作流的测试比普通代码复杂,因为涉及 LLM 调用。推荐分层测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @SpringBootTest class ContentPublishWorkflowTest { @MockBean private ChatClient chatClient; @Test void should_route_to_human_review_when_auto_review_fails () { when (chatClient.prompt().system(any()).user(any()).call().content()) .thenReturn("文章初稿内容..." ) .thenReturn("{\"passed\": false, \"issues\": [\"缺少代码示例\"]}" ); WorkflowExecution result = engine.run( contentPublishWorkflow, Map.of("topic" , "AI Agent" , "requirements" , "深入讲解" ) ); assertThat(result.getStatus()).isEqualTo(ExecutionStatus.PAUSED); assertThat(result.getCurrentNodeId()).isEqualTo("human-review" ); } @Test void should_skip_human_review_when_auto_review_passes () { when (chatClient.prompt().system(any()).user(any()).call().content()) .thenReturn("文章初稿内容..." ) .thenReturn("{\"passed\": true, \"issues\": []}" ); WorkflowExecution result = engine.run( contentPublishWorkflow, Map.of("topic" , "AI Agent" , "requirements" , "深入讲解" ) ); assertThat(result.getStatus()).isEqualTo(ExecutionStatus.COMPLETED); assertThat(result.getExecutedNodeIds()) .containsExactly("draft" , "auto-review" , "publish" ); } }
六、模式组合:真实企业案例 在实际项目中,你很少只用一种模式。更常见的是模式组合 ——一个工作流内部混合使用多种模式。
案例:智能合同审查系统 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Configuration public class ContractReviewWorkflow { @Bean public WorkflowDefinition contractReview () { return WorkflowDefinition.builder() .name("智能合同审查" ) .parallelGroup("parallel-review" , llmNode("legal-review" , "你是一个法律顾问..." , "{{contract}}" ), llmNode("financial-review" , "你是一个财务审计师..." , "{{contract}}" ), llmNode("technical-review" , "你是一个技术架构师..." , "{{contract}}" ) ) .node(llmNode("risk-assessment" , "根据三方审查结果评估总体风险等级:低风险/中风险/高风险" , "法律审查:{{legal-review}}\n财务审查:{{financial-review}}\n技术审查:{{technical-review}}" )) .transition("risk-assessment" , Transition.builder() .defaultNext("generate-report" ) .condition("$.risk_level == '高风险'" , "escalate-to-partner" ) .build()) .orchestrator("generate-report" , List.of( workerNode("exec-summary" , "生成执行摘要..." ), workerNode("risk-detail" , "生成风险详情..." ), workerNode("recommendation" , "生成建议..." ) )) .node(humanNode("legal-signoff" , "请法务确认审查结果" )) .node(humanNode("business-signoff" , "请业务方确认审查结果" )) .build(); } }
这个案例展示了如何在同一个工作流中灵活组合五种模式——这才是真实项目的常态。
七、边界与展望 7.1 工作流编排的边界 工作流不是万能的。以下场景不适合过度编排:
高度探索性的任务 :如”帮我调研一个未知领域的技术方案”——你不知道需要多少步、每步做什么,这时候 ReAct 模式 更合适
单轮对话 :简单的问答、翻译、摘要——不需要工作流,直接调 LLM 就行
频繁变化的流程 :如果业务流程每周都在变,维护 WorkflowDefinition 的成本可能超过收益
判断标准 :如果你的流程有 3 个以上的步骤,涉及多个 LLM 调用,需要错误处理和人工审核——上工作流。否则,用简单的顺序调用就够了。
7.2 未来方向
自适应工作流 :工作流根据执行过程中的中间结果动态调整——不是预定义的条件跳转,而是 LLM 自己决定下一步该做什么。这本质上是在 Workflow 中嵌入 ReAct。
工作流即代码(Workflow-as-Code) :用 DSL 或自然语言描述工作流,由 LLM 自动生成 WorkflowDefinition。LangGraph 已经在探索这个方向。
多模态工作流 :不仅处理文本,还处理图像、音频、视频。例如:上传产品图 → AI 生成描述 → 翻译成多语言 → 生成不同平台的素材。
联邦工作流 :多个组织各自运行工作流的一部分,通过安全协议交换中间结果。例如:医院和保险公司各自审查合同的不同部分。
总结 本文从”当 ReAct 不够用”这个真实痛点出发,系统地讲解了 AI Agent 工作流编排的五种核心模式:
顺序链 ——最简单的流水线,A → B → C
并行扇出 ——独立任务同时执行,适合多维度分析
路由分发 ——根据输入动态选择处理路径
编排者-工作者 ——最强大的模式,动态拆解 + 并行执行 + 智能汇总
人机协作 ——关键决策点引入人类审核
关键认知升级:
工作流 ≠ 限制 Agent 的灵活性 ,而是在确定性骨架中嵌入灵活性
Spring AI + 自定义编排引擎完全能满足 Java 生态的需求 ,不需要为了 LangGraph 引入 Python
生产环境需要的是可观测、可复现、可审计的工作流 ,不是 demo 级的简单链式调用
模式组合 才是真实项目的常态——一个工作流内部混合使用多种模式
下一篇,我们将探讨 AI Agent 的成本优化 ——当你的 Agent 每天处理上万次请求时,如何在质量和成本之间找到平衡点。
本文是”AI Agent 全栈实战”系列的第 20 篇。如果你觉得有收获,欢迎关注博客 blog.goku.top 获取更新通知。