贝利信息

c# 在 C# 中实现事务性发件箱(Transactional Outbox)模式

日期:2026-01-23 00:00 / 作者:畫卷琴夢
直接用数据库事务发消息会出问题,因为SaveChanges()后调用消息发送若失败,业务已提交但消息丢失,破坏一致性;Transactional Outbox通过将消息写入同事务的outbox表,再由独立幂等投递器轮询发送来解决。

为什么直接用数据库事务发消息会出问题

在 C# 应用中,你可能写过类似这样的代码:SaveChanges() 之后立刻调用 bus.Publish()producer.SendAsync()。表面看是“先存库再发消息”,但一旦消息发送失败(网络抖动、Broker 不可用、序列化异常),业务已提交,消息却丢了——违反了“要么都成功,要么都不发生”的一致性要求。

Transactional Outbox 的核心思路是:把要发的消息也当作业务数据,写进同一个数据库事务里。消息不是“发出去”,而是“记下来”,后续由一个独立的、幂等的投递器(Outbox Processor)去轮询并转发。

如何在 Entity Framework Core 中建 outbox 表并自动写入

你需要一张 OutboxMessages 表,字段至少包含:Id(GUID)、TypeName(事件全名)、Content(JSON 字符串)、ProcessedAt(NULL 表示未处理)、CreatedAt。关键在于:它必须和你的业务实体共享同一个 DbContext 实例,并在同一个 SaveChanges() 中被插入。

推荐做法是封装一个 OutboxService,在业务逻辑中调用 AddOutboxMessage(T @event),内部只是 new 一个 OutboxMessagecontext.OutboxMessages.Add()。EF Core 会把它当成普通实体参与事务。

public class OutboxMessage
{

public Guid Id { get; set; } public string TypeName { get; set; } = null!; public string Content { get; set; } = null!; public DateTime CreatedAt { get; set; } = DateTime.UtcNow; public DateTime? ProcessedAt { get; set; } }

怎么安全地轮询并投递 outbox 消息

投递器不能和业务应用跑在同一个进程里(否则进程崩溃会导致消息丢失),建议作为独立后台服务(如 .NET Worker Service),或用 Quartz.NET / Hangfire 定时触发。每次只取少量(例如 100 条)ProcessedAt IS NULL 的记录,按 CreatedAt 排序,逐条尝试发送到消息队列(如 RabbitMQ、Kafka)。

重点在于“发送成功后才更新 ProcessedAt”——这步更新也必须走数据库事务,且必须是**同一个数据库连接**(不能新开 DbContext)。否则会出现消息已发、但 DB 更新失败,导致重复投递。

常见坑:序列化、重试、幂等性怎么处理

Outbox 表里的 Content 是 JSON,必须保证序列化前后完全一致。别用 System.Text.Json 默认设置——它会忽略 null 字段、按字母序排序属性。务必显式配置 JsonSerializerOptions,并全局复用同一实例:

var options = new JsonSerializerOptions
{
    DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
    PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
    WriteIndented = false
};

投递失败后的重试天然带来重复风险。解决方案不在 outbox 层,而在消费者端:每条消息带唯一 MessageId(通常就是 outbox 表的 Id),消费者需维护已处理 ID 的去重表(或 Redis Set),收到重复 ID 直接丢弃。

最易被忽略的是:投递器的数据库连接字符串是否启用了连接池?是否设置了合理的 Max Pool Size?高吞吐下连接耗尽会导致投递停滞,而业务库仍在持续写入 outbox,最终填满磁盘。