Go并发实践笔记一(基础用法)

不得不说Go很方便实现并发处理,这里针对实际工作中用的到并发做个笔记

本文分两篇,这一篇是基本用法,另一篇涉及到并发度控制,参考《Go并发实践笔记二(进阶用法)》

第一部分 基本用法

一、并发任务是隔离的
对一组数据,进行处理,不需要对处理结果进行聚合返回

1、描述
更新一组SP的数据,遍历更新耗时较旧,使用并发

2、实现

type Spinfo struct {
	ID     int64  `json:"id"`
	Name   string `json:"name"`
	Spcode string `json:"spcode"`
	Status int64  `json:"status"`
}

func Execute(spinfo Spinfo) {
	// do something. exp: update db
	fmt.Printf("name: %s; time: %d\n", spinfo.Name, time.Now().Unix())
}

func DoTask(datas []Spinfo) {
	var wg = sync.WaitGroup{}
	for _, data := range datas {
		wg.Add(1)
		data := data
		go func() {
			defer func() {
				if r := recover(); r!= nil {
					// add log
				}

				wg.Done()
			}()

			Execute(data)
		}()
	}

	wg.Wait()
}

3、测试


func TestDoTask(t *testing.T) {
	datas := make([]Spinfo, 100)

	datas = []Spinfo{
		{
			ID: 1,
			Name: "阳光出行",
			Spcode: "9000",
		},
		{
			ID: 2,
			Name: "滴滴出行",
			Spcode: "9001",
		},
		{
			ID: 3,
			Name: "T3出行",
			Spcode: "9002",
		},
		{
			ID: 4,
			Name: "曹操出行",
			Spcode: "9003",
		},
	}

	DoTask(datas)
}

4、结果

=== RUN   TestDoTask
name: 阳光出行; time: 1683688911
name: 曹操出行; time: 1683688911
name: 滴滴出行; time: 1683688911
name: T3出行; time: 1683688911
--- PASS: TestDoTask (0.00s)
PASS

二、并发任务需要聚合结果通过sync.Map
1、描述
实际工作中,很多并发场景往往需要把并发请求的结果进行聚合,这是我们需要结合sync.Map实现

2、实现

type Spinfo struct {
	ID     int64  `json:"id"`
	Name   string `json:"name"`
	Spcode string `json:"spcode"`
	Status int64  `json:"status"`
}

// GetData mock get data from api
func GetData(id int64) (data Spinfo) {
	switch id {
	case 1:
		data = Spinfo{
			ID: 1,
			Name: "阳光出行",
			Spcode: "9000",
		}
	case 2:
		data = Spinfo{
			ID: 2,
			Name: "滴滴出行",
			Spcode: "9001",
		}
	case 3:
		data = Spinfo{
			ID: 3,
			Name: "T3出行",
			Spcode: "9002",
		}
	case 4:
		data = Spinfo{
			ID: 4,
			Name: "曹操出行",
			Spcode: "9003",
		}
	}

	fmt.Printf("id: %d; time: %d\n", id, time.Now().Unix())
	time.Sleep(time.Second)

	return
}

func DoTask(ids []int64) map[string]Spinfo {
	/*
	    defer func() {
			if r := recover(); r != nil {
	            log.Println("recover", err)
			}
		}()
	*/

	var wg sync.WaitGroup
	var res sync.Map
	for _, id := range ids {
		wg.Add(1)
		id := id
		go func() {
			defer func() {
				if err := recover(); err != nil {
					log.Println("recover", err)
				}

				wg.Done()
			}()

			data := GetData(id)
			res.Store(data.Spcode, data)
		}()
	}

	wg.Wait()

	ret := make(map[string]Spinfo)
	res.Range(func(key, value interface{}) bool {
		if spinfo, ok := value.(Spinfo); ok {
			ret[spinfo.Spcode] = spinfo
		}
		return true
	})

	return ret
}

3、测试

func TestDoTask(t *testing.T) {
	ids := make([]int64, 100)

	ids = []int64{1,2,3,4}
	res := DoTask(ids)

	fmt.Println(res)
}

4、结果

=== RUN   TestDoTask
id: 4; time: 1683731235
id: 2; time: 1683731235
id: 1; time: 1683731235
id: 3; time: 1683731235
map[9000:{1 阳光出行 9000 0} 9001:{2 滴滴出行 9001 0} 9002:{3 T3出行 9002 0} 9003:{4 曹操出行 9003 0}]
--- PASS: TestDoTask (1.01s)
PASS

三、并发任务需要聚合结果通过读写锁自定义Map

1、描述

2、实现
自定义Map

type SpSafeMap struct {
	mx   sync.RWMutex
	data map[string]Spinfo
}

func (a *SpSafeMap) Set(key string, val Spinfo) {
	a.mx.Lock()
	defer a.mx.Unlock()
	if a.data == nil {
		a.data = make(map[string]Spinfo)
	}
	a.data[key] = val
}

func (a *SpSafeMap) Get(key string) Spinfo {
	a.mx.RLock()
	defer a.mx.RUnlock()
	if a.data == nil {
		a.data = make(map[string]Spinfo)
	}
	return a.data[key]
}

数据处理

type Spinfo struct {
	ID     int64  `json:"id"`
	Name   string `json:"name"`
	Spcode string `json:"spcode"`
	Status int64  `json:"status"`
}

// GetData mock get data from api
func GetData(id int64) (data Spinfo) {
	switch id {
	case 1:
		data = Spinfo{
			ID: 1,
			Name: "阳光出行",
			Spcode: "9000",
		}
	case 2:
		data = Spinfo{
			ID: 2,
			Name: "滴滴出行",
			Spcode: "9001",
		}
	case 3:
		data = Spinfo{
			ID: 3,
			Name: "T3出行",
			Spcode: "9002",
		}
	case 4:
		data = Spinfo{
			ID: 4,
			Name: "曹操出行",
			Spcode: "9003",
		}
	}

	fmt.Printf("id: %d; time: %d\n", id, time.Now().Unix())
	time.Sleep(time.Second)

	return
}

func DoTask(ids []int64) map[string]Spinfo {
	/*
	    defer func() {
			if r := recover(); r != nil {
	            log.Println("recover", err)
			}
		}()
	*/

	var wg sync.WaitGroup
	var res SpSafeMap
	for _, id := range ids {
		wg.Add(1)
		id := id
		go func() {
			defer func() {
				if err := recover(); err != nil {
					log.Println("recover", err)
				}

				wg.Done()
			}()

			data := GetData(id)
			res.Set(data.Spcode, data)
		}()
	}

	wg.Wait()

	return res.data
}

3、测试

4、结果

=== RUN   TestDoTask
id: 4; time: 1683731643
id: 2; time: 1683731643
id: 1; time: 1683731643
id: 3; time: 1683731643
map[9000:{1 阳光出行 9000 0} 9001:{2 滴滴出行 9001 0} 9002:{3 T3出行 9002 0} 9003:{4 曹操出行 9003 0}]
--- PASS: TestDoTask (1.00s)
PASS

四、总结
1、通过读写锁自定义Map,对结果不用Range和断言
2、每个goroutine中 recover panic【个人心得:如果一个goroutine出现panic, 没有recover, 全部goroutine都会中断。看程序情况,如果一个goroutine panic 整个逻辑都异常的话,可以在函数开头统一写recover 参考line2-line8; 如果一个goroutine panic 只影响当前goroutine, 就可以为每个goroutine写recover】

发表评论

电子邮件地址不会被公开。 必填项已用*标注