Skip to content

Step 5:工作流编排

适用读者:需要构建多步骤、多 Agent 自动化流程的开发者 前置知识:理解 AIAgent 和基本工具调用 本文目标:从顺序链到并行/条件/扇入/切换,彻底掌握 MAF Workflows 的控制流能力


单个 Agent 的单次调用能做的事情有限。真实业务中往往是多步骤的:

"帮我处理退款,订单号 ORD-2024-001"

背后可能是:查订单 → 验证退款资格 → 计算金额 → 执行退款 → 发通知 → 记入 CRM。而且中间可能有分支——如果订单不符合退款条件,要跳到拒绝流程;还要并行——查订单和查物流同时进行;还要根据上一个步骤的结果决定下一步怎么走。


工作流核心概念

Microsoft Agent Framework Workflows 提供了一套完整的有向图编排引擎,核心组件:

概念说明
Executor处理单元(可以是 AI Agent 或自定义逻辑)
Edge连接 Executor 的路径,决定消息流向
WorkflowBuilder用 Fluent API 构建有向图
InProcessExecution执行工作流,支持流式和阻塞模式
Superstep执行的基本单位——一个超步内所有 Executor 并行执行,之后同步等待
Shared State跨 Executor 的共享状态,用于传递中间结果

两种 API 的选择

维度函数式 API图形 API (WorkflowBuilder)
控制流原生 C#(if/for/await)Edges 和 Conditions
适合场景顺序流水线、自定义循环固定图、扇出/扇入、类型校验路由
并行Task.WhenAll并行边组 + Superstep
人机协同直接代码RequestInfoExecutor
Checkpoint每 @step超步边界

一句话建议:如果工作流拓扑固定且类型路由复杂,用图形 API;如果需要动态条件或灵活循环,用函数式 API。两者可以混合。


Executor:工作流的基本单元

Executor 是工作流的处理单元,有两种定义方式。

方式一:继承 Executor 基类 + [MessageHandler]

这是推荐的编译时源码生成方式:

csharp
using Microsoft.Agents.AI.Workflows;

// 定义注意:类必须是 partial,方法加 [MessageHandler] 属性
internal sealed partial class OrderLookupExecutor : Executor
{
    public OrderLookupExecutor() : base("OrderLookup") { }

    // 方法签名必须为: private async ValueTask<TOutput> HandleAsync(
    //     TInput message, IWorkflowContext context, CancellationToken)
    [MessageHandler]
    private async ValueTask<OrderInfo> HandleAsync(
        ChatMessage message,
        IWorkflowContext context,
        CancellationToken cancellationToken)
    {
        // 用 AI Agent 或直接逻辑处理
        var orderInfo = await LookupOrderAsync(message.Text);
        return orderInfo;
    }
}

方式二:BindAsExecutor 扩展方法

适合把已有的简单函数包装为 Executor:

csharp
Func<string, string> uppercase = s => s.ToUpperInvariant();
var uppercaseExecutor = uppercase.BindAsExecutor("ToUpper");

Agent Executor

把 AI Agent 包装为 Executor 非常直接——使用 AgentExecutor 或直接在 [MessageHandler] 中调用 Agent:

csharp
internal sealed partial class RefundEligibilityExecutor : Executor
{
    private readonly AIAgent _agent;

    public RefundEligibilityExecutor(AIAgent agent) : base("CheckEligibility")
    {
        _agent = agent;
    }

    [MessageHandler]
    private async ValueTask<EligibilityResult> HandleAsync(
        OrderInfo message,
        IWorkflowContext context,
        CancellationToken ct)
    {
        var response = await _agent.RunAsync(
            $"判断订单 {message.OrderId} 的退款资格", ct);

        // 把结果写到共享状态,供后续 Executor 读取
        await context.QueueStateUpdateAsync(
            $"eligibility-{message.OrderId}",
            new EligibilityResult { /* ... */ },
            scopeName: "RefundScope");

        return JsonSerializer.Deserialize<EligibilityResult>(response.Text);
    }
}

Edge:控制流的灵魂

Edge 定义了消息如何在 Executor 之间流动。MAF Workflows 支持 5 种 Edge 类型

类型说明使用场景
Direct一对一连接,无条件顺序流水线
Conditional带条件的边,满足条件才流转if/else 二分路由
Switch-Case多分支路由三分及以上路由(含 default)
Fan-out (Multi-Selection)一个源发往多个目标并行处理
Fan-in (Barrier)多个源汇聚到一个目标聚合/等待所有并行完成

下面逐一演示。


Direct Edge:顺序链

最简单的形式——一个接一个执行:

csharp
WorkflowBuilder builder = new(orderLookup);
builder.AddEdge(orderLookup, refundCheck);   // 查订单 → 验资格
builder.AddEdge(refundCheck, processRefund); // 验资格 → 处理退款
builder.AddEdge(processRefund, notifyUser);  // 处理退款 → 通知用户

builder.WithOutputFrom(notifyUser);
var workflow = builder.Build();

流程图:

OrderLookup ──→ RefundCheck ──→ ProcessRefund ──→ NotifyUser ──→ (输出)

Conditional Edge:条件分支(if/else)

核心需求:"根据上一个 Executor 的结果,决定走哪条路"——Conditional Edge 就是为此而生。

场景:根据订单验证结果决定走"退款"还是"拒绝"

定义条件函数:

csharp
// 条件工厂:返回一个判断消息是否符合预期的函数
private static Func<object?, bool> GetCondition(bool expectedEligible) =>
    message => message is EligibilityResult result
               && result.IsEligible == expectedEligible;

用条件连接 Edge:

csharp
// 第一个参数:源 Executor
// 第二个参数:目标 Executor
// 第三个参数(可选):条件函数,返回 true 时才流转
WorkflowBuilder builder = new(orderLookup);

// 符合条件的边:isEligible == true → 走退款流程
builder.AddEdge(orderLookup, processRefund,
    condition: GetCondition(expectedEligible: true));

// 不符合条件的边:isEligible == false → 走拒绝流程
builder.AddEdge(orderLookup, rejectRefund,
    condition: GetCondition(expectedEligible: false));

builder.WithOutputFrom(processRefund, rejectRefund);

完整流程图:

         ┌─── (isEligible=true) ─── ProcessRefund ────┐
         │                                              │
OrderLookup                                             ├──→ 输出
         │                                              │
         └─── (isEligible=false) ── RejectRefund ───────┘

⚡ 关键设计点

  1. 条件是独立的 Edge 属性,不是 Executor 的一部分。一个 Executor 可以有多条出边,每条有自己的条件。
  2. Type-safe 求值:条件函数接收 object?,第一件事就是类型转换并检查属性。不匹配的消息不会引发异常,只是条件返回 false。
  3. 无匹配 = 消息丢弃:如果所有出边的条件都返回 false,该消息被静默丢弃(不会报错,但也不会转发)。
  4. 多条出边可命中:如果多个条件同时为 true,消息会复制发给所有目标 Executor(实现扇出)。

Switch-Case Edge:多分支路由

当分支超过 2 条时,用多个 Conditional Edge 会显得啰嗦。Switch-Case 提供更简洁的语法。

场景:邮件分类——垃圾/正常/不确定

定义枚举和条件工厂:

csharp
public enum SpamDecision { NotSpam, Spam, Uncertain }

private static Func<object?, bool> GetCondition(SpamDecision expected) =>
    message => message is DetectionResult r && r.SpamDecision == expected;

AddSwitchCase 构建多分支:

csharp
// 构建 Switch 路由:一个源 → 三个目标
builder.AddSwitchCase(spamDetector,
    (GetCondition(SpamDecision.NotSpam), emailAssistant),
    (GetCondition(SpamDecision.Spam), handleSpam),
    defaultCase: handleUncertain // 可选默认路由
);
              ┌── NotSpam ─── EmailAssistant ── EmailResponseExecutor

SpamDetector ──┼── Spam ───── HandleSpamExecutor

              └── Uncertain (default) ── HandleUncertainExecutor

对比 Conditional Edge 的优势

  • 一个方法调用即可声明所有分支,代码更紧凑
  • 可选的 defaultCase 处理"其他"情况,避免静默丢弃
  • 运行时按声明顺序匹配,第一个命中的分支执行

Fan-out Edge:并行扇出

核心需求:"查订单、查物流、查用户信用——同时查,互不等待。"

MAF 的 Superstep 模型天然支持并行:同一超步内的所有 Executor 并发执行

方式一:多 Targets

csharp
// orderLookup 完成后,消息同时发往三个目标
builder.AddEdge(orderLookup, creditCheck);
builder.AddEdge(orderLookup, logisticsCheck);
builder.AddEdge(orderLookup, fraudCheck);

由于三个目标同属一个超步,它们并发执行

               ┌── CreditCheck

OrderLookup ───┼── LogisticsCheck    ← 这三个在同一超步并行

               └── FraudCheck

方式二:AddFanOutEdge 显式声明

csharp
builder.AddFanOutEdge(orderLookup,
    creditCheck, logisticsCheck, fraudCheck);

效果同上,但意图更清晰。


Fan-in Edge (Barrier):同步屏障

核心需求:"等三个并行查询都做完,然后汇总结果。"

AddFanInBarrierEdge

csharp
builder.AddFanInBarrierEdge(
    sources: [creditCheck, logisticsCheck, fraudCheck],
    target: aggregator);

aggregator 中接收的是聚合后的消息数组

csharp
[MessageHandler]
private async ValueTask<RefundDecision> HandleAsync(
    // 注意:Fan-in 接收的是数组
    object?[] messages,
    IWorkflowContext context,
    CancellationToken ct)
{
    var creditResult = messages.OfType<CreditResult>().First();
    var logisticsResult = messages.OfType<LogisticsResult>().First();
    var fraudResult = messages.OfType<FraudResult>().First();

    return new RefundDecision
    {
        CanProceed = creditResult.Score > 600
                     && !fraudResult.IsSuspicious,
        Assessment = $"信用:{creditResult.Score}/物流:{logisticsResult.Status}/风控:{fraudResult.IsSuspicious}"
    };
}

完整并行 + 汇聚流程图:

               ┌── CreditCheck ────┐
               │                    │
OrderLookup ───┼── LogisticsCheck ──┼── Fan-in Barrier ── Aggregator
               │                    │
               └── FraudCheck ─────┘

Fan-in 的关键行为

  1. 等待所有源 Executor 完成才触发目标 Executor。任何一个没完成,Aggregator 都不会执行。
  2. 自动按类型过滤:目标 Executor 只收到匹配其 [MessageHandler] 参数类型的消息。
  3. 不支持条件:Fan-in 不能加 condition 参数——它是一个同步屏障,不是路由。
  4. 超步同步:由于 Superstep 自带同步屏障,同一超步内的所有 Executor 完成后才进入下一超步——Fan-in 利用这个机制实现等待。

Superstep 执行模型:理解并行和阻断的根源

MAF Workflows 使用改进的 Pregel 模型(Bulk Synchronous Parallel,BSP)。

Superstep N:
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  收集所有待处理  │───▶│  按类型和条件路由 │───▶│  并行执行所有    │
│  消息            │    │  到目标 Executor │    │  目标 Executor   │
└─────────────────┘    └─────────────────┘    └─────────────────┘

                                                       │ 同步屏障:等待全部完成
┌─────────────────┐    ┌─────────────────┐             │
│  开始下一超步    │◀───│  发射事件和新消息 │◀────────────┘
└─────────────────┘    └─────────────────┘

同步屏障的影响

超步 1:
OrderLookup ──────────────┐
                          ├── (超步同步, 等全部完成)
CreditCheck ──────────────┘

超步 2:
RefundCheck ─────────────────── (只有 OrderLookup 完成后才执行)

这解释了

  • 并行:同一超步内的所有 Executor 并发执行(Fan-out 的原理)
  • 阻断:下一超步必须等当前超步全部完成才能开始。如果 Fan-out 了一个长链和一个短链,长链没完成前短链不会继续
  • 跳过:Conditional Edge 条件不满足时目标 Executor 不会收到消息,自然就"跳过"了

如果你需要"真正的独立并行"

如果两条路径完全独立、不想互相阻塞,把每条路径合并到单个 Executor 中:

csharp
// 不要这样:step1 → step2 (并行链1) 和 step3 (并行链2) ——它们互相阻塞
builder.AddEdge(source, step1);
builder.AddEdge(source, step3);
builder.AddEdge(step1, step2); // step2 要等 step3 完成

// 应该这样:把 step1+step2 合并为一个 Executor
builder.AddEdge(source, compositeStep1And2);
builder.AddEdge(source, step3);
// 现在两者在同一个超步,互不阻塞

共享状态:跨 Executor 传递数据

这是条件分支和结果驱动流的核心基础设施。IWorkflowContext 提供了跨 Executor 的共享状态:

csharp
// Executor A:写入共享状态
await context.QueueStateUpdateAsync(
    key: $"order-{orderId}",   // 唯一键
    value: orderInfo,           // 任意对象
    scopeName: "RefundScope");  // 作用域

// Executor B:读取共享状态
var order = await context.ReadStateAsync<OrderInfo>(
    key: $"order-{orderId}",
    scopeName: "RefundScope");

共享状态的典型使用模式

Executor A (查订单)
  ├── QueueStateUpdateAsync("order-123", orderInfo, "RefundScope")
  ├── 返回 OrderInfo 消息 → 流转到 Executor B

Executor B (验资格)
  ├── 接收 OrderInfo 消息(消息本身携带部分数据)
  ├── ReadStateAsync("order-123", "RefundScope") 获取更多数据
  ├── 写评估结果 QueueStateUpdateAsync(...)
  └── 返回 EligibilityResult → 根据条件分流

Message 传递 vs Shared State

维度Message 传递Shared State
数据量小(典型 < 10KB)
生命周期单次 Edge 流转整个 Workflow 周期
可见性仅目标 Executor任意 Executor,按 scope 访问
适合场景关键业务数据、决定路由的数据参考数据、大对象、日志

完整实战:多 Agent 退款工作流

综合以上所有模式,构建一个真实的退款处理工作流:

                     ┌── (eligible) ── RefundAgent ───┐
                     │                                  │
OrderLookupAgent ────┤                                  ├── Fan-in ── NotifyAgent
                     │                                  │
                     └── (ineligible) ── RejectAgent ───┘

                                   (并行查询结果)
                     ┌── CreditCheckAgent ──────────────┘

                     └── FraudCheckAgent ───────────────┘

Step 1:定义数据模型

csharp
public class OrderInfo
{
    public string OrderId { get; set; } = "";
    public decimal Amount { get; set; }
    public DateTime CreatedAt { get; set; }
}

public class EligibilityResult
{
    public bool IsEligible { get; set; }
    public string Reason { get; set; } = "";
}

public class RefundResult
{
    public bool Success { get; set; }
    public string RefundId { get; set; } = "";
}

public class CreditScore
{
    public int Score { get; set; }
    public bool IsFraudSuspicious { get; set; }
}

Step 2:实现 Executor

csharp
// 查订单 Agent
internal sealed partial class OrderLookupExecutor : Executor
{
    private readonly AIAgent _agent;

    public OrderLookupExecutor(AIAgent agent) : base("OrderLookup")
    {
        _agent = agent;
    }

    [MessageHandler]
    private async ValueTask<OrderInfo> HandleAsync(
        ChatMessage message,
        IWorkflowContext context,
        CancellationToken ct)
    {
        var agentResponse = await _agent.RunAsync(message, ct);
        var order = JsonSerializer.Deserialize<OrderInfo>(agentResponse.Text)!;

        // 存到共享状态
        await context.QueueStateUpdateAsync(
            $"order-{order.OrderId}", order, scopeName: "RefundScope");

        return order;
    }
}

// 信用检查 Agent(并行执行之一)
internal sealed partial class CreditCheckExecutor : Executor
{
    private readonly AIAgent _agent;

    public CreditCheckExecutor(AIAgent agent) : base("CreditCheck")
    {
        _agent = agent;
    }

    [MessageHandler]
    private async ValueTask<CreditScore> HandleAsync(
        OrderInfo message,
        IWorkflowContext context,
        CancellationToken ct)
    {
        // 从共享状态读取订单数据
        var order = await context.ReadStateAsync<OrderInfo>(
            $"order-{message.OrderId}", "RefundScope");

        var response = await _agent.RunAsync(
            $"评估用户信用,订单金额 {order.Amount}", ct);

        return JsonSerializer.Deserialize<CreditScore>(response.Text)!;
    }
}

// ... FraudCheckExecutor 类似

// 聚合器:汇总并行结果
internal sealed partial class RefundAggregatorExecutor : Executor
{
    public RefundAggregatorExecutor() : base("RefundAggregator")
    {
    }

    [MessageHandler]
    private async ValueTask<EligibilityResult> HandleAsync(
        object?[] messages, // Fan-in 接受数组
        IWorkflowContext context,
        CancellationToken ct)
    {
        var creditScore = messages.OfType<CreditScore>().FirstOrDefault();
        var fraudResult = messages.OfType<FraudResult>().FirstOrDefault();

        if (fraudResult?.IsSuspicious == true)
        {
            return new EligibilityResult
            {
                IsEligible = false,
                Reason = "风控检测异常,拒绝退款"
            };
        }

        if (creditScore?.Score < 500)
        {
            return new EligibilityResult
            {
                IsEligible = false,
                Reason = $"信用分过低({creditScore.Score}),拒绝退款"
            };
        }

        return new EligibilityResult { IsEligible = true };
    }
}

Step 3:用 WorkflowBuilder 编排

csharp
// 创建 Executor
var orderLookup = new OrderLookupExecutor(orderAgent);
var creditCheck = new CreditCheckExecutor(creditAgent);
var fraudCheck = new FraudCheckExecutor(fraudAgent);
var aggregator = new RefundAggregatorExecutor();
var refund = new RefundProcessExecutor(refundAgent);
var reject = new RefundRejectExecutor(rejectAgent);
var notify = new NotifyExecutor();

// 构建工作流
WorkflowBuilder builder = new(orderLookup);

// Step 1: 查订单后并行查信用和风控
builder.AddEdge(orderLookup, creditCheck);
builder.AddEdge(orderLookup, fraudCheck);

// Step 2: Fan-in 等待并行结果 → 聚合判断资格
builder.AddFanInBarrierEdge(
    sources: [creditCheck, fraudCheck],
    target: aggregator);

// Step 3: 条件路由——根据聚合结果分流
builder.AddEdge(aggregator, refund,
    condition: msg => msg is EligibilityResult r && r.IsEligible);
builder.AddEdge(aggregator, reject,
    condition: msg => msg is EligibilityResult r && !r.IsEligible);

// Step 4: 无论退款成功还是拒绝,都通知用户
builder.AddEdge(refund, notify);
builder.AddEdge(reject, notify);

// 输出
builder.WithOutputFrom(notify);

// 构建
var workflow = builder.Build();

Step 4:执行并观测事件

csharp
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(
    workflow, new ChatMessage(ChatRole.User, "处理退款 ORD-2024-001"));

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    switch (evt)
    {
        case ExecutorStartedEvent start:
            Console.WriteLine($"[开始] {start.ExecutorId}");
            break;

        case ExecutorCompletedEvent complete:
            Console.WriteLine($"[完成] {complete.ExecutorId}: {complete.Data}");
            break;

        case ExecutorSkippedEvent skipped:
            // 条件不满足时 Executor 被跳过
            Console.WriteLine($"[跳过] {skipped.ExecutorId}");
            break;

        case WorkflowOutputEvent output:
            Console.WriteLine($"[输出] {output.Data}");
            break;
    }
}

可能的事件输出

[开始] OrderLookup
[完成] OrderLookup: 订单找到,金额 2999,下单时间 2024-06-01
[开始] CreditCheck      ← 并行
[开始] FraudCheck       ← 并行
[完成] FraudCheck: 风控正常
[完成] CreditCheck: 信用分 720
[开始] RefundAggregator  ← Fan-in 等待两个都完成才触发
[完成] RefundAggregator: 合格
[开始] RefundProcess
[完成] RefundProcess: 退款成功 #RMA-2024-001
[开始] NotifyExecutor
[完成] NotifyExecutor: 已通知用户
[输出] 退款已处理,编号 RMA-2024-001

ExecutorSkippedEvent:跳过的处理

当条件边不匹配时,目标 Executor 不会被触发,同时会触发 ExecutorSkippedEvent

但这意味着工作流出错——消息被静默丢弃。如果你需要"条件不满足时走默认路径",有两种做法:

方式一:明确的两条 Conditional Edge

csharp
// 一条 for 满足条件,一条 for 不满足
builder.AddEdge(aggregator, refund,
    condition: msg => msg is EligibilityResult r && r.IsEligible);
builder.AddEdge(aggregator, reject,
    condition: msg => msg is EligibilityResult r && !r.IsEligible);

方式二:用 SwitchCase 的 defaultCase

csharp
builder.AddSwitchCase(source,
    (GetCondition(Decision.Approve), approveHandler),
    (GetCondition(Decision.Reject), rejectHandler),
    defaultCase: escalateHandler // 既不是 Approve 也不是 Reject
);

跨超步结果驱动

如果你需要"根据上一步的结果决定下一步的参数",这完全可以通过Message 类型转换Shared State实现:

csharp
// Executor A 输出 EligibilityResult
// Executor A 也在 Shared State 存了完整的 OrderInfo

// Conditional Edge 根据 EligibilityResult.IsEligible 路由
builder.AddEdge(aggregator, refund,
    condition: msg => msg is EligibilityResult r && r.IsEligible);

// Executor B 接收 EligibilityResult,然后通过 Shared State 取更多数据
[MessageHandler]
private async ValueTask<RefundResult> HandleAsync(
    EligibilityResult message,
    IWorkflowContext context,
    CancellationToken ct)
{
    // 从共享状态获取原始订单数据
    var order = await context.ReadStateAsync<OrderInfo>(
        "order-ORD-2024-001", "RefundScope");

    // 根据订单信息执行退款
    return await ProcessRefund(order, message);
}

工作流 vs Agent 自主调用

场景推荐方式
步骤顺序确定,不能调换Workflow(图或函数式)
步骤顺序不确定,需动态决策Agent 自主调用(工具调用)
需要并行处理Workflow Fan-out
需要人工审批介入Workflow HITL (Human-in-the-Loop)
简单的两步骤可以直接用 Agent 链式调用
需要 checkpoint 恢复Workflow(超步边界自动 checkpoint)

速查表

需求API
顺序执行.AddEdge(a, b)
条件路由 (if/else).AddEdge(a, b, condition: fn)
多分支.AddSwitchCase(source, (cond, target), ..., defaultCase:)
并行.AddFanOutEdge(source, target1, target2, ...)
同步等待.AddFanInBarrierEdge(sources: [...], target:)
跳过条件不满足时自动跳过; 用 ExecutorSkippedEvent 观测
阻断Superstep 同步屏障天然阻断
传递数据Message(小数据)+ Shared State(大数据)
读取前序结果context.ReadStateAsync<T>(key, scope)
写入中间结果context.QueueStateUpdateAsync(key, value, scope)

本文内容基于 Microsoft Learn - Agent Framework Workflows 官方文档编写。

下一步:Step 6:托管部署 — 通过 ASP.NET Core 将 Agent 暴露为服务。

学而不思则罔,思而不学则殆