第二部分 进阶用法-并发度控制
在基础用法中,我们很简单的实现了并发处理数据,但是随着数据量增多,隐藏的问题就会浮出水面。
并发操作数据库,随数据增加,goroutine增多,会出现部分routine invalid connection DB, 可能是超过了DB的连接数
并发调用下游API,随数据增加,goroutine增多,会触发下游API限流,导致部分请求失败
这个时候就需要进行并发度控制
一、并发度控制,基于sync和channel实现
实现原理:使用sync.WaitGroup和channel结合,先启动指定个数goroutine, 每个goroutine阻塞在channel口循环接收数据,直到channel关闭,核心是把数据分给多个goroutine,每个goroutine循环处理
1、描述
定义最大并发数maxConcurrency,启动maxConcurrency个goroutine, 每个goroutine阻塞等待着从channel取待执行的任务数据
2、实现
func DoTask(datas []Spinfo) { var wg = sync.WaitGroup{} var c = make(chan Spinfo, 1) var maxConcurrency = 2 for i := 0; i < maxConcurrency; i++ { wg.Add(1) go func() { defer func() { if r := recover(); r!= nil { // add log } wg.Done() }() for { // c中有值,并且取到了ok才为true, 否则一直阻塞,只到c关闭 data, ok := <-c if !ok { return } Execute(data) } }() } for _, data := range datas { c <- data } close(c) wg.Wait() }
3、测试
同上
4、结果
=== RUN TestDoTask name: 滴滴出行; time: 1683710351 name: 阳光出行; time: 1683710351 name: T3出行; time: 1683710352 name: 曹操出行; time: 1683710352 --- PASS: TestDoTask (2.01s) PASS
二、并发度控制,基于errgroup和channel
实现原理:使用errgroup封装的sync.WaitGroup能力,本质和上面一样
1、描述
2、实现
import "golang.org/x/sync/errgroup" func DoTask(datas []Spinfo) { var g errgroup.Group var c = make(chan Spinfo, 1) var maxConcurrency = 2 for i := 0; i < maxConcurrency; i++ { g.Go(func() (err error) { for { data, ok := <-c if !ok { return err } Execute(data) } }) } for _, data := range datas { c <- data } close(c) g.Wait() }
3、测试
同上
4、结果
同上
三、并发度控制,基于errgroup实现
1、描述
errgroup包内部实现了并发控制
2、实现
func DoTask(datas []Spinfo) { var g errgroup.Group var maxConcurrency = 2 g.SetLimit(maxConcurrency) for _, data := range datas { data := data g.Go(func() (err error) { Execute(data) return err }) } g.Wait() }
3、测试
同上
4、结果
=== RUN TestDoTask name: 滴滴出行; time: 1683712103 name: 阳光出行; time: 1683712103 name: 曹操出行; time: 1683712104 name: T3出行; time: 1683712104 --- PASS: TestDoTask (2.01s) PASS
四、并发度控制,基于sync和channel实现二
1、描述
定义有缓冲区的channel, 缓冲区大小为最大并发数,初始创建maxConcurrency个goroutine, 处理完后,在创建新的goroutine
2、实现
func DoTask(datas []Spinfo) { var wg sync.WaitGroup var maxConcurrency = 2 var c = make(chan int64, maxConcurrency ) for _, data := range datas { wg.Add(1) c <- 1 data := data go func() { defer func() { <-c wg.Done() }() Execute(data) }() } wg.Wait() }
3、测试
同上
4、结果
同上
五、并发度控制,任务池实现
任务池pool实现【github上一百多行,实现的任务池,项目中用过,可以设置并发数,挺好用】
六、并发度控制,题外话
一开始没有控制并发数,突然有一天发现,并发请求触发了依赖接口的限流,当时还不知道有并发数控制的概念,临时处理是20个并发,休息一会,现在回头看之前的代码,忍不住偷笑,这大概就是进步吧,回头得找个时间优化下
1、休息的代码赏析
// ... wg := &sync.WaitGroup{} pageCount := int(math.Ceil(total.(float64) / float64(pagesize))) // 如果有多页,并发请求,不用上次单独请求第一页的结果 for i := 2; i <= pageCount; i++ { // 20个goroutine休眠5ms, 防止timeout if i/20 == 0 { time.Sleep(5 * time.Microsecond) } wg.Add(1) //i := i go func() { defer func() { if err := recover(); err != nil { tracelog.Error(ctx, fmt.Sprintf("GetCouponList recover: %v", err)) } wg.Done() }() // do something }() } wg.Wait() // ...
参考:
1.6 来,控制一下 goroutine 的并发数量【煎鱼博客,主要意识到打印时间戳观察并发任务】