贝利信息

Reactive Stream 中如何正确合并多个 Flux 数据流

日期:2026-01-13 00:00 / 作者:花韻仙語

在 spring webflux 或 project reactor 中,使用 `mergewith` 时需注意其不可变性——它不会原地修改流,而是返回新流;错误地忽略返回值会导致数据丢失,正确做法是用 `flatmap` 或链式 `fold` 累积合并。

在响应式编程中,常见的误区之一是将命令式思维(如 for 循环 + 累加变量)直接套用于 Reactor 的 Flux 操作。你提供的代码:

val ids = repository.findIds().map { it.ekycId }
val allEventFlux = Flux.empty()
for (id in ids) {
    val events: Flux = eventStore.readEvents(id)
    allEventFlux.mergeWith(events) // ❌ 错误:返回新 Flux,但未赋值!
}

问题核心在于:mergeWith 是纯函数式操作,返回一个全新的 Flux,而非修改原流。因此 allEventFlux.mergeWith(events) 执行后,结果被丢弃,allEventFlux 始终保持为初始的空流 Flux.empty()。

✅ 正确方案

一:推荐使用 flatMap(语义清晰、性能友好)

val allEvents: Flux = Flux.fromIterable(repository.findIds())
    .map { it.ekycId }
    .flatMap { id -> eventStore.readEvents(id) }

✅ 正确方案二:若需严格顺序合并(如 mergeWith 语义),用 fold

val ids = repository.findIds().map { it.ekycId }
val allEvents: Flux = ids.fold(Flux.empty()) { acc, id ->
    acc.mergeWith(eventStore.readEvents(id))
}
val allEventsInOrder: Flux = ids.fold(Flux.empty()) { acc, id ->
    acc.concatWith(eventStore.readEvents(id)) // ✅ 严格串行:ID1 → ID2 → ...
}

⚠️ 注意事项

总结

场景 推荐操作符 特点
高吞吐、事件无需严格 ID 顺序 flatMap 并发执行,自动背压,最常用
各 ID 事件需严格串行输出 concatWith(配合 fold) 顺序执行,延迟高,适合强序要求
多流静态合并(已知固定数量) Flux.merge(flux1, flux2, flux3) 更直观,适用于编译期确定流数

始终牢记:Reactor 是声明式、不可变的响应式流模型——每一次操作都在定义“未来如何处理数据”,而非立即执行。 正确理解这一范式,是写出健壮响应式代码的前提。