贝利信息

如何使用Golang实现并发任务分组_Golang worker pool与任务调度实践

日期:2026-01-14 00:00 / 作者:P粉602998670
直接用go启动大量goroutine易崩,因内存暴涨、调度器过载、OOM;worker pool通过固定worker数、任务队列、复用执行者实现可控并发,50行内可构建生产级池子。

为什么直接用 go 启动大量 goroutine 容易崩

不是 goroutine 本身贵,而是没节制地创建会导致内存暴涨、调度器过载、甚至触发 OOM。比如读取 10 万行日志并逐行解析,如果每行都 go parseLine(line),瞬间可能起 10 万个 goroutine,而多数任务实际是 I/O 等待或 CPU 轻量计算,根本不需要这么多并发单元。

真正需要的是可控的并发度 + 任务排队 + 复用执行者。这正是 worker pool 的核心价值:用固定数量的长期运行 goroutine 消费任务队列,避免资源抖动。

chan + for range 实现最简 worker pool

不依赖第三方库,50 行内可搭出生产可用的池子。关键在于任务通道类型

定义、worker 死循环消费、以及主协程关闭信号的传递方式。

下面是最小可行示例,支持优雅关闭:

type Task func()
type WorkerPool struct {
    tasks chan Task
    done  chan struct{}
}

func NewWorkerPool(workerCount int) *WorkerPool { return &WorkerPool{ tasks: make(chan Task, 100), // 缓冲区防主协程阻塞 done: make(chan struct{}), } }

func (wp *WorkerPool) Start() { for i := 0; i < cap(wp.tasks); i++ { go wp.worker() } }

func (wp *WorkerPool) worker() { for { select { case task := <-wp.tasks: task() case <-wp.done: return } } }

func (wp *WorkerPool) Submit(task Task) { wp.tasks <- task }

func (wp *WorkerPool) Shutdown() { close(wp.done) }

注意:cap(wp.tasks) 是通道容量,不是 worker 数量 —— 这里故意写错来提醒你:worker 数量应独立传入,别和通道容量混用。

sync.WaitGroupcontext.Context 哪个更适合控制生命周期

WaitGroup 只解决“等所有任务结束”,不解决“中途取消”;Context 支持取消、超时、值传递,但需每个 worker 主动检查 ctx.Done()。二者常组合使用。

任务分组的实际约束:如何让同组任务串行执行

常见需求:用户 A 的 10 个订单更新必须按顺序处理,但用户 B 的订单可与 A 并发。这不是靠加锁能解的 —— 锁会全局串行化,违背并发初衷。

正确做法是哈希分组 + 每组独占一个 channel:

分组逻辑一旦写死,就很难动态扩缩容。线上遇到热点 key(如大 V 用户请求暴增),单 worker 会成为瓶颈,此时需要二级分片或降级为异步重试。