Step 5:工作流编排
适用读者:需要构建多步骤、多 Agent 自动化流程的开发者 前置知识:理解 AIAgent 和基本工具调用 本文目标:从顺序链到并行/条件/扇入/切换,彻底掌握 MAF Workflows 的控制流能力
单个 Agent 的单次调用能做的事情有限。真实业务中往往是多步骤的:
"帮我处理退款,订单号 ORD-2024-001"
背后可能是:查订单 → 验证退款资格 → 计算金额 → 执行退款 → 发通知 → 记入 CRM。而且中间可能有分支——如果订单不符合退款条件,要跳到拒绝流程;还要并行——查订单和查物流同时进行;还要根据上一个步骤的结果决定下一步怎么走。
工作流核心概念
Microsoft Agent Framework Workflows 提供了一套完整的有向图编排引擎,核心组件:
| 概念 | 说明 |
|---|---|
| Executor | 处理单元(可以是 AI Agent 或自定义逻辑) |
| Edge | 连接 Executor 的路径,决定消息流向 |
| WorkflowBuilder | 用 Fluent API 构建有向图 |
| InProcessExecution | 执行工作流,支持流式和阻塞模式 |
| Superstep | 执行的基本单位——一个超步内所有 Executor 并行执行,之后同步等待 |
| Shared State | 跨 Executor 的共享状态,用于传递中间结果 |
两种 API 的选择
| 维度 | 函数式 API | 图形 API (WorkflowBuilder) |
|---|---|---|
| 控制流 | 原生 C#(if/for/await) | Edges 和 Conditions |
| 适合场景 | 顺序流水线、自定义循环 | 固定图、扇出/扇入、类型校验路由 |
| 并行 | Task.WhenAll | 并行边组 + Superstep |
| 人机协同 | 直接代码 | RequestInfoExecutor |
| Checkpoint | 每 @step | 超步边界 |
一句话建议:如果工作流拓扑固定且类型路由复杂,用图形 API;如果需要动态条件或灵活循环,用函数式 API。两者可以混合。
Executor:工作流的基本单元
Executor 是工作流的处理单元,有两种定义方式。
方式一:继承 Executor 基类 + [MessageHandler]
这是推荐的编译时源码生成方式:
using Microsoft.Agents.AI.Workflows;
// 定义注意:类必须是 partial,方法加 [MessageHandler] 属性
internal sealed partial class OrderLookupExecutor : Executor
{
public OrderLookupExecutor() : base("OrderLookup") { }
// 方法签名必须为: private async ValueTask<TOutput> HandleAsync(
// TInput message, IWorkflowContext context, CancellationToken)
[MessageHandler]
private async ValueTask<OrderInfo> HandleAsync(
ChatMessage message,
IWorkflowContext context,
CancellationToken cancellationToken)
{
// 用 AI Agent 或直接逻辑处理
var orderInfo = await LookupOrderAsync(message.Text);
return orderInfo;
}
}方式二:BindAsExecutor 扩展方法
适合把已有的简单函数包装为 Executor:
Func<string, string> uppercase = s => s.ToUpperInvariant();
var uppercaseExecutor = uppercase.BindAsExecutor("ToUpper");Agent Executor
把 AI Agent 包装为 Executor 非常直接——使用 AgentExecutor 或直接在 [MessageHandler] 中调用 Agent:
internal sealed partial class RefundEligibilityExecutor : Executor
{
private readonly AIAgent _agent;
public RefundEligibilityExecutor(AIAgent agent) : base("CheckEligibility")
{
_agent = agent;
}
[MessageHandler]
private async ValueTask<EligibilityResult> HandleAsync(
OrderInfo message,
IWorkflowContext context,
CancellationToken ct)
{
var response = await _agent.RunAsync(
$"判断订单 {message.OrderId} 的退款资格", ct);
// 把结果写到共享状态,供后续 Executor 读取
await context.QueueStateUpdateAsync(
$"eligibility-{message.OrderId}",
new EligibilityResult { /* ... */ },
scopeName: "RefundScope");
return JsonSerializer.Deserialize<EligibilityResult>(response.Text);
}
}Edge:控制流的灵魂
Edge 定义了消息如何在 Executor 之间流动。MAF Workflows 支持 5 种 Edge 类型:
| 类型 | 说明 | 使用场景 |
|---|---|---|
| Direct | 一对一连接,无条件 | 顺序流水线 |
| Conditional | 带条件的边,满足条件才流转 | if/else 二分路由 |
| Switch-Case | 多分支路由 | 三分及以上路由(含 default) |
| Fan-out (Multi-Selection) | 一个源发往多个目标 | 并行处理 |
| Fan-in (Barrier) | 多个源汇聚到一个目标 | 聚合/等待所有并行完成 |
下面逐一演示。
Direct Edge:顺序链
最简单的形式——一个接一个执行:
WorkflowBuilder builder = new(orderLookup);
builder.AddEdge(orderLookup, refundCheck); // 查订单 → 验资格
builder.AddEdge(refundCheck, processRefund); // 验资格 → 处理退款
builder.AddEdge(processRefund, notifyUser); // 处理退款 → 通知用户
builder.WithOutputFrom(notifyUser);
var workflow = builder.Build();流程图:
OrderLookup ──→ RefundCheck ──→ ProcessRefund ──→ NotifyUser ──→ (输出)Conditional Edge:条件分支(if/else)
核心需求:"根据上一个 Executor 的结果,决定走哪条路"——Conditional Edge 就是为此而生。
场景:根据订单验证结果决定走"退款"还是"拒绝"
定义条件函数:
// 条件工厂:返回一个判断消息是否符合预期的函数
private static Func<object?, bool> GetCondition(bool expectedEligible) =>
message => message is EligibilityResult result
&& result.IsEligible == expectedEligible;用条件连接 Edge:
// 第一个参数:源 Executor
// 第二个参数:目标 Executor
// 第三个参数(可选):条件函数,返回 true 时才流转
WorkflowBuilder builder = new(orderLookup);
// 符合条件的边:isEligible == true → 走退款流程
builder.AddEdge(orderLookup, processRefund,
condition: GetCondition(expectedEligible: true));
// 不符合条件的边:isEligible == false → 走拒绝流程
builder.AddEdge(orderLookup, rejectRefund,
condition: GetCondition(expectedEligible: false));
builder.WithOutputFrom(processRefund, rejectRefund);完整流程图:
┌─── (isEligible=true) ─── ProcessRefund ────┐
│ │
OrderLookup ├──→ 输出
│ │
└─── (isEligible=false) ── RejectRefund ───────┘⚡ 关键设计点
- 条件是独立的 Edge 属性,不是 Executor 的一部分。一个 Executor 可以有多条出边,每条有自己的条件。
- Type-safe 求值:条件函数接收
object?,第一件事就是类型转换并检查属性。不匹配的消息不会引发异常,只是条件返回 false。 - 无匹配 = 消息丢弃:如果所有出边的条件都返回 false,该消息被静默丢弃(不会报错,但也不会转发)。
- 多条出边可命中:如果多个条件同时为 true,消息会复制发给所有目标 Executor(实现扇出)。
Switch-Case Edge:多分支路由
当分支超过 2 条时,用多个 Conditional Edge 会显得啰嗦。Switch-Case 提供更简洁的语法。
场景:邮件分类——垃圾/正常/不确定
定义枚举和条件工厂:
public enum SpamDecision { NotSpam, Spam, Uncertain }
private static Func<object?, bool> GetCondition(SpamDecision expected) =>
message => message is DetectionResult r && r.SpamDecision == expected;用 AddSwitchCase 构建多分支:
// 构建 Switch 路由:一个源 → 三个目标
builder.AddSwitchCase(spamDetector,
(GetCondition(SpamDecision.NotSpam), emailAssistant),
(GetCondition(SpamDecision.Spam), handleSpam),
defaultCase: handleUncertain // 可选默认路由
); ┌── NotSpam ─── EmailAssistant ── EmailResponseExecutor
│
SpamDetector ──┼── Spam ───── HandleSpamExecutor
│
└── Uncertain (default) ── HandleUncertainExecutor对比 Conditional Edge 的优势:
- 一个方法调用即可声明所有分支,代码更紧凑
- 可选的
defaultCase处理"其他"情况,避免静默丢弃 - 运行时按声明顺序匹配,第一个命中的分支执行
Fan-out Edge:并行扇出
核心需求:"查订单、查物流、查用户信用——同时查,互不等待。"
MAF 的 Superstep 模型天然支持并行:同一超步内的所有 Executor 并发执行。
方式一:多 Targets
// orderLookup 完成后,消息同时发往三个目标
builder.AddEdge(orderLookup, creditCheck);
builder.AddEdge(orderLookup, logisticsCheck);
builder.AddEdge(orderLookup, fraudCheck);由于三个目标同属一个超步,它们并发执行:
┌── CreditCheck
│
OrderLookup ───┼── LogisticsCheck ← 这三个在同一超步并行
│
└── FraudCheck方式二:AddFanOutEdge 显式声明
builder.AddFanOutEdge(orderLookup,
creditCheck, logisticsCheck, fraudCheck);效果同上,但意图更清晰。
Fan-in Edge (Barrier):同步屏障
核心需求:"等三个并行查询都做完,然后汇总结果。"
用 AddFanInBarrierEdge:
builder.AddFanInBarrierEdge(
sources: [creditCheck, logisticsCheck, fraudCheck],
target: aggregator);在 aggregator 中接收的是聚合后的消息数组:
[MessageHandler]
private async ValueTask<RefundDecision> HandleAsync(
// 注意:Fan-in 接收的是数组
object?[] messages,
IWorkflowContext context,
CancellationToken ct)
{
var creditResult = messages.OfType<CreditResult>().First();
var logisticsResult = messages.OfType<LogisticsResult>().First();
var fraudResult = messages.OfType<FraudResult>().First();
return new RefundDecision
{
CanProceed = creditResult.Score > 600
&& !fraudResult.IsSuspicious,
Assessment = $"信用:{creditResult.Score}/物流:{logisticsResult.Status}/风控:{fraudResult.IsSuspicious}"
};
}完整并行 + 汇聚流程图:
┌── CreditCheck ────┐
│ │
OrderLookup ───┼── LogisticsCheck ──┼── Fan-in Barrier ── Aggregator
│ │
└── FraudCheck ─────┘Fan-in 的关键行为
- 等待所有源 Executor 完成才触发目标 Executor。任何一个没完成,Aggregator 都不会执行。
- 自动按类型过滤:目标 Executor 只收到匹配其
[MessageHandler]参数类型的消息。 - 不支持条件:Fan-in 不能加
condition参数——它是一个同步屏障,不是路由。 - 超步同步:由于 Superstep 自带同步屏障,同一超步内的所有 Executor 完成后才进入下一超步——Fan-in 利用这个机制实现等待。
Superstep 执行模型:理解并行和阻断的根源
MAF Workflows 使用改进的 Pregel 模型(Bulk Synchronous Parallel,BSP)。
Superstep N:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 收集所有待处理 │───▶│ 按类型和条件路由 │───▶│ 并行执行所有 │
│ 消息 │ │ 到目标 Executor │ │ 目标 Executor │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
│ 同步屏障:等待全部完成
┌─────────────────┐ ┌─────────────────┐ │
│ 开始下一超步 │◀───│ 发射事件和新消息 │◀────────────┘
└─────────────────┘ └─────────────────┘同步屏障的影响
超步 1:
OrderLookup ──────────────┐
├── (超步同步, 等全部完成)
CreditCheck ──────────────┘
超步 2:
RefundCheck ─────────────────── (只有 OrderLookup 完成后才执行)这解释了:
- 并行:同一超步内的所有 Executor 并发执行(Fan-out 的原理)
- 阻断:下一超步必须等当前超步全部完成才能开始。如果 Fan-out 了一个长链和一个短链,长链没完成前短链不会继续
- 跳过:Conditional Edge 条件不满足时目标 Executor 不会收到消息,自然就"跳过"了
如果你需要"真正的独立并行"
如果两条路径完全独立、不想互相阻塞,把每条路径合并到单个 Executor 中:
// 不要这样:step1 → step2 (并行链1) 和 step3 (并行链2) ——它们互相阻塞
builder.AddEdge(source, step1);
builder.AddEdge(source, step3);
builder.AddEdge(step1, step2); // step2 要等 step3 完成
// 应该这样:把 step1+step2 合并为一个 Executor
builder.AddEdge(source, compositeStep1And2);
builder.AddEdge(source, step3);
// 现在两者在同一个超步,互不阻塞共享状态:跨 Executor 传递数据
这是条件分支和结果驱动流的核心基础设施。IWorkflowContext 提供了跨 Executor 的共享状态:
// Executor A:写入共享状态
await context.QueueStateUpdateAsync(
key: $"order-{orderId}", // 唯一键
value: orderInfo, // 任意对象
scopeName: "RefundScope"); // 作用域
// Executor B:读取共享状态
var order = await context.ReadStateAsync<OrderInfo>(
key: $"order-{orderId}",
scopeName: "RefundScope");共享状态的典型使用模式
Executor A (查订单)
├── QueueStateUpdateAsync("order-123", orderInfo, "RefundScope")
├── 返回 OrderInfo 消息 → 流转到 Executor B
│
Executor B (验资格)
├── 接收 OrderInfo 消息(消息本身携带部分数据)
├── ReadStateAsync("order-123", "RefundScope") 获取更多数据
├── 写评估结果 QueueStateUpdateAsync(...)
└── 返回 EligibilityResult → 根据条件分流Message 传递 vs Shared State
| 维度 | Message 传递 | Shared State |
|---|---|---|
| 数据量 | 小(典型 < 10KB) | 大 |
| 生命周期 | 单次 Edge 流转 | 整个 Workflow 周期 |
| 可见性 | 仅目标 Executor | 任意 Executor,按 scope 访问 |
| 适合场景 | 关键业务数据、决定路由的数据 | 参考数据、大对象、日志 |
完整实战:多 Agent 退款工作流
综合以上所有模式,构建一个真实的退款处理工作流:
┌── (eligible) ── RefundAgent ───┐
│ │
OrderLookupAgent ────┤ ├── Fan-in ── NotifyAgent
│ │
└── (ineligible) ── RejectAgent ───┘
↑
(并行查询结果)
┌── CreditCheckAgent ──────────────┘
│
└── FraudCheckAgent ───────────────┘Step 1:定义数据模型
public class OrderInfo
{
public string OrderId { get; set; } = "";
public decimal Amount { get; set; }
public DateTime CreatedAt { get; set; }
}
public class EligibilityResult
{
public bool IsEligible { get; set; }
public string Reason { get; set; } = "";
}
public class RefundResult
{
public bool Success { get; set; }
public string RefundId { get; set; } = "";
}
public class CreditScore
{
public int Score { get; set; }
public bool IsFraudSuspicious { get; set; }
}Step 2:实现 Executor
// 查订单 Agent
internal sealed partial class OrderLookupExecutor : Executor
{
private readonly AIAgent _agent;
public OrderLookupExecutor(AIAgent agent) : base("OrderLookup")
{
_agent = agent;
}
[MessageHandler]
private async ValueTask<OrderInfo> HandleAsync(
ChatMessage message,
IWorkflowContext context,
CancellationToken ct)
{
var agentResponse = await _agent.RunAsync(message, ct);
var order = JsonSerializer.Deserialize<OrderInfo>(agentResponse.Text)!;
// 存到共享状态
await context.QueueStateUpdateAsync(
$"order-{order.OrderId}", order, scopeName: "RefundScope");
return order;
}
}
// 信用检查 Agent(并行执行之一)
internal sealed partial class CreditCheckExecutor : Executor
{
private readonly AIAgent _agent;
public CreditCheckExecutor(AIAgent agent) : base("CreditCheck")
{
_agent = agent;
}
[MessageHandler]
private async ValueTask<CreditScore> HandleAsync(
OrderInfo message,
IWorkflowContext context,
CancellationToken ct)
{
// 从共享状态读取订单数据
var order = await context.ReadStateAsync<OrderInfo>(
$"order-{message.OrderId}", "RefundScope");
var response = await _agent.RunAsync(
$"评估用户信用,订单金额 {order.Amount}", ct);
return JsonSerializer.Deserialize<CreditScore>(response.Text)!;
}
}
// ... FraudCheckExecutor 类似
// 聚合器:汇总并行结果
internal sealed partial class RefundAggregatorExecutor : Executor
{
public RefundAggregatorExecutor() : base("RefundAggregator")
{
}
[MessageHandler]
private async ValueTask<EligibilityResult> HandleAsync(
object?[] messages, // Fan-in 接受数组
IWorkflowContext context,
CancellationToken ct)
{
var creditScore = messages.OfType<CreditScore>().FirstOrDefault();
var fraudResult = messages.OfType<FraudResult>().FirstOrDefault();
if (fraudResult?.IsSuspicious == true)
{
return new EligibilityResult
{
IsEligible = false,
Reason = "风控检测异常,拒绝退款"
};
}
if (creditScore?.Score < 500)
{
return new EligibilityResult
{
IsEligible = false,
Reason = $"信用分过低({creditScore.Score}),拒绝退款"
};
}
return new EligibilityResult { IsEligible = true };
}
}Step 3:用 WorkflowBuilder 编排
// 创建 Executor
var orderLookup = new OrderLookupExecutor(orderAgent);
var creditCheck = new CreditCheckExecutor(creditAgent);
var fraudCheck = new FraudCheckExecutor(fraudAgent);
var aggregator = new RefundAggregatorExecutor();
var refund = new RefundProcessExecutor(refundAgent);
var reject = new RefundRejectExecutor(rejectAgent);
var notify = new NotifyExecutor();
// 构建工作流
WorkflowBuilder builder = new(orderLookup);
// Step 1: 查订单后并行查信用和风控
builder.AddEdge(orderLookup, creditCheck);
builder.AddEdge(orderLookup, fraudCheck);
// Step 2: Fan-in 等待并行结果 → 聚合判断资格
builder.AddFanInBarrierEdge(
sources: [creditCheck, fraudCheck],
target: aggregator);
// Step 3: 条件路由——根据聚合结果分流
builder.AddEdge(aggregator, refund,
condition: msg => msg is EligibilityResult r && r.IsEligible);
builder.AddEdge(aggregator, reject,
condition: msg => msg is EligibilityResult r && !r.IsEligible);
// Step 4: 无论退款成功还是拒绝,都通知用户
builder.AddEdge(refund, notify);
builder.AddEdge(reject, notify);
// 输出
builder.WithOutputFrom(notify);
// 构建
var workflow = builder.Build();Step 4:执行并观测事件
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(
workflow, new ChatMessage(ChatRole.User, "处理退款 ORD-2024-001"));
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorStartedEvent start:
Console.WriteLine($"[开始] {start.ExecutorId}");
break;
case ExecutorCompletedEvent complete:
Console.WriteLine($"[完成] {complete.ExecutorId}: {complete.Data}");
break;
case ExecutorSkippedEvent skipped:
// 条件不满足时 Executor 被跳过
Console.WriteLine($"[跳过] {skipped.ExecutorId}");
break;
case WorkflowOutputEvent output:
Console.WriteLine($"[输出] {output.Data}");
break;
}
}可能的事件输出:
[开始] OrderLookup
[完成] OrderLookup: 订单找到,金额 2999,下单时间 2024-06-01
[开始] CreditCheck ← 并行
[开始] FraudCheck ← 并行
[完成] FraudCheck: 风控正常
[完成] CreditCheck: 信用分 720
[开始] RefundAggregator ← Fan-in 等待两个都完成才触发
[完成] RefundAggregator: 合格
[开始] RefundProcess
[完成] RefundProcess: 退款成功 #RMA-2024-001
[开始] NotifyExecutor
[完成] NotifyExecutor: 已通知用户
[输出] 退款已处理,编号 RMA-2024-001ExecutorSkippedEvent:跳过的处理
当条件边不匹配时,目标 Executor 不会被触发,同时会触发 ExecutorSkippedEvent。
但这不意味着工作流出错——消息被静默丢弃。如果你需要"条件不满足时走默认路径",有两种做法:
方式一:明确的两条 Conditional Edge
// 一条 for 满足条件,一条 for 不满足
builder.AddEdge(aggregator, refund,
condition: msg => msg is EligibilityResult r && r.IsEligible);
builder.AddEdge(aggregator, reject,
condition: msg => msg is EligibilityResult r && !r.IsEligible);方式二:用 SwitchCase 的 defaultCase
builder.AddSwitchCase(source,
(GetCondition(Decision.Approve), approveHandler),
(GetCondition(Decision.Reject), rejectHandler),
defaultCase: escalateHandler // 既不是 Approve 也不是 Reject
);跨超步结果驱动
如果你需要"根据上一步的结果决定下一步的参数",这完全可以通过Message 类型转换和Shared State实现:
// Executor A 输出 EligibilityResult
// Executor A 也在 Shared State 存了完整的 OrderInfo
// Conditional Edge 根据 EligibilityResult.IsEligible 路由
builder.AddEdge(aggregator, refund,
condition: msg => msg is EligibilityResult r && r.IsEligible);
// Executor B 接收 EligibilityResult,然后通过 Shared State 取更多数据
[MessageHandler]
private async ValueTask<RefundResult> HandleAsync(
EligibilityResult message,
IWorkflowContext context,
CancellationToken ct)
{
// 从共享状态获取原始订单数据
var order = await context.ReadStateAsync<OrderInfo>(
"order-ORD-2024-001", "RefundScope");
// 根据订单信息执行退款
return await ProcessRefund(order, message);
}工作流 vs Agent 自主调用
| 场景 | 推荐方式 |
|---|---|
| 步骤顺序确定,不能调换 | Workflow(图或函数式) |
| 步骤顺序不确定,需动态决策 | Agent 自主调用(工具调用) |
| 需要并行处理 | Workflow Fan-out |
| 需要人工审批介入 | Workflow HITL (Human-in-the-Loop) |
| 简单的两步骤 | 可以直接用 Agent 链式调用 |
| 需要 checkpoint 恢复 | Workflow(超步边界自动 checkpoint) |
速查表
| 需求 | API |
|---|---|
| 顺序执行 | .AddEdge(a, b) |
| 条件路由 (if/else) | .AddEdge(a, b, condition: fn) |
| 多分支 | .AddSwitchCase(source, (cond, target), ..., defaultCase:) |
| 并行 | .AddFanOutEdge(source, target1, target2, ...) |
| 同步等待 | .AddFanInBarrierEdge(sources: [...], target:) |
| 跳过 | 条件不满足时自动跳过; 用 ExecutorSkippedEvent 观测 |
| 阻断 | Superstep 同步屏障天然阻断 |
| 传递数据 | Message(小数据)+ Shared State(大数据) |
| 读取前序结果 | context.ReadStateAsync<T>(key, scope) |
| 写入中间结果 | context.QueueStateUpdateAsync(key, value, scope) |
本文内容基于 Microsoft Learn - Agent Framework Workflows 官方文档编写。
下一步:Step 6:托管部署 — 通过 ASP.NET Core 将 Agent 暴露为服务。