Go并发实践笔记二(进阶用法)

第二部分 进阶用法-并发度控制

在基础用法中,我们很简单的实现了并发处理数据,但是随着数据量增多,隐藏的问题就会浮出水面。
并发操作数据库,随数据增加,goroutine增多,会出现部分routine invalid connection DB, 可能是超过了DB的连接数
并发调用下游API,随数据增加,goroutine增多,会触发下游API限流,导致部分请求失败

这个时候就需要进行并发度控制

一、并发度控制,基于sync和channel实现
实现原理:使用sync.WaitGroup和channel结合,先启动指定个数goroutine, 每个goroutine阻塞在channel口循环接收数据,直到channel关闭,核心是把数据分给多个goroutine,每个goroutine循环处理

1、描述
定义最大并发数maxConcurrency,启动maxConcurrency个goroutine, 每个goroutine阻塞等待着从channel取待执行的任务数据

2、实现

func DoTask(datas []Spinfo) {
	var wg = sync.WaitGroup{}
	var c = make(chan Spinfo, 1)

	var maxConcurrency = 2
	for i := 0; i < maxConcurrency; i++ {
		wg.Add(1)
		go func() {
			defer func() {
				if r := recover(); r!= nil {
					// add log
				}

				wg.Done()
			}()

			for {
				// c中有值,并且取到了ok才为true, 否则一直阻塞,只到c关闭
                                data, ok := <-c
				if !ok {
					return
				}
				Execute(data)
			}
		}()
	}

	for _, data := range datas {
		c <- data
	}

	close(c)
	wg.Wait()
}

3、测试
同上

4、结果

=== RUN   TestDoTask
name: 滴滴出行; time: 1683710351
name: 阳光出行; time: 1683710351
name: T3出行; time: 1683710352
name: 曹操出行; time: 1683710352
--- PASS: TestDoTask (2.01s)
PASS

二、并发度控制,基于errgroup和channel
实现原理:使用errgroup封装的sync.WaitGroup能力,本质和上面一样

1、描述

2、实现

import "golang.org/x/sync/errgroup"

func DoTask(datas []Spinfo) {
	var g errgroup.Group
	var c = make(chan Spinfo, 1)

	var maxConcurrency = 2
	for i := 0; i < maxConcurrency; i++ {
		g.Go(func() (err error) {
			for {
				data, ok := <-c
				if !ok {
					return err
				}
				Execute(data)
			}
		})
	}

	for _, data := range datas {
		c <- data
	}

	close(c)
	g.Wait()
}

3、测试
同上

4、结果
同上

三、并发度控制,基于errgroup实现
1、描述
errgroup包内部实现了并发控制

2、实现

func DoTask(datas []Spinfo) {
	var g errgroup.Group

        var maxConcurrency = 2
	g.SetLimit(maxConcurrency)
	for _, data := range datas {
		data := data
		g.Go(func() (err error) {
			Execute(data)

			return err
		})
	}

	g.Wait()
}

3、测试
同上

4、结果

=== RUN   TestDoTask
name: 滴滴出行; time: 1683712103
name: 阳光出行; time: 1683712103
name: 曹操出行; time: 1683712104
name: T3出行; time: 1683712104
--- PASS: TestDoTask (2.01s)
PASS

四、并发度控制,基于sync和channel实现二

1、描述
定义有缓冲区的channel, 缓冲区大小为最大并发数,初始创建maxConcurrency个goroutine, 处理完后,在创建新的goroutine

2、实现

func DoTask(datas []Spinfo) {
	var wg sync.WaitGroup
	var maxConcurrency = 2
	var c = make(chan int64, maxConcurrency )
	for _, data := range datas {
		wg.Add(1)
		c <- 1
		data := data
		go func() {
			defer func() {
				<-c
				wg.Done()
			}()

			Execute(data)
		}()
	}

	wg.Wait()
}

3、测试
同上

4、结果
同上

五、并发度控制,任务池实现
任务池pool实现【github上一百多行,实现的任务池,项目中用过,可以设置并发数,挺好用】

六、并发度控制,题外话
一开始没有控制并发数,突然有一天发现,并发请求触发了依赖接口的限流,当时还不知道有并发数控制的概念,临时处理是20个并发,休息一会,现在回头看之前的代码,忍不住偷笑,这大概就是进步吧,回头得找个时间优化下

1、休息的代码赏析

        // ...
        wg := &sync.WaitGroup{}
	pageCount := int(math.Ceil(total.(float64) / float64(pagesize)))
	// 如果有多页,并发请求,不用上次单独请求第一页的结果
	for i := 2; i <= pageCount; i++ {
		// 20个goroutine休眠5ms, 防止timeout
		if i/20 == 0 {
			time.Sleep(5 * time.Microsecond)
		}
		wg.Add(1)
		//i := i
		go func() {
			defer func() {
				if err := recover(); err != nil {
					tracelog.Error(ctx, fmt.Sprintf("GetCouponList recover: %v", err))
				}

				wg.Done()
			}()
			
			// do something
		}()
	}
	
	wg.Wait()
        // ...

参考:
1.6 来,控制一下 goroutine 的并发数量【煎鱼博客,主要意识到打印时间戳观察并发任务】

发表评论

电子邮件地址不会被公开。 必填项已用*标注