Golang高并发

并发基础

你电脑只有 1 颗 CPU,也能同时开 20 个浏览器页面,对吧?
其实不是同时做,而是:

CPU 速度非常快
它切换任务 → 我们看起来像同时做

Go 的并发,也是同一个思想:
创建很多“小任务”,轮流执行,看起来像是一起执行的。

Go func是什么?

某个函数执行

result := doTask(url)
👉 会阻塞:一条没做完,下一条不能做

Go 的核心 加上

go doTask(url)

这叫 启动一个协程(goroutine)
它会后台运行,不等它执行完
主程序继续往下走

最基础的版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"net/http"
"sync"
"time"
)

func main() {

client := &http.Client{Timeout: 2 * time.Second}

urls := []string{
"https://www.baidu.com",
"https://www.qq.com",
"https://www.bilibili.com",
}

var wg sync.WaitGroup
wg.Add(len(urls))

start := time.Now()

for _, u := range urls {
go func(link string) {
defer wg.Done()

resp, err := client.Get(link)
if err != nil {
fmt.Println("请求失败:", link)
return
}
resp.Body.Close()
fmt.Println("完成:", link)
}(u)
}
wg.Wait()

fmt.Printf("全部完成!耗时:%v\n", time.Since(start))
}

并发控制?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
"fmt"
"net/http"
"sync"
"time"
)

func main() {

urls := []string{}
for i := 0; i < 100; i++ {
urls = append(urls, "https://www.baidu.com")
}

client := &http.Client{Timeout: 2 * time.Second}

// 限制并发为 20
limit := make(chan struct{}, 20)

var wg sync.WaitGroup
begin := time.Now()

for _, u := range urls {
wg.Add(1)

limit <- struct{}{} // 占一个坑(如果满了就阻塞)

go func(link string) {
defer wg.Done()
defer func() { <-limit }() // 用完释放一个坑
resp, err := client.Get(link)
if err == nil {
resp.Body.Close()
}

fmt.Println("完成:", link)
}(u)
}

wg.Wait()
fmt.Println("全部完成,用时:", time.Since(begin))
}

高并发核心

认识这个库

github.com/panjf2000/ants/v2

1
var pool, _ = ants.NewPool(10)

什么意思

创建了 10个协程池,可以同时执行10个任务。

为什么不用go func() 的形式

方式 问题 适用于谁
直接 go func() 协程数量无上限 → 内存爆炸 → 拥塞 小量任务
make(chan) + go routine 自己控制 代码量大,容易踩坑 熟练工程师
ants.Pool 自动回收调度、限制并发、复用 goroutine 最佳方案

简易使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"time"

"github.com/panjf2000/ants/v2"
)

func main() {

// 最大并发数:20
pool, _ := ants.NewPool(20)
defer pool.Release()

start := time.Now()

for i := 0; i < 100; i++ {
num := i
pool.Submit(func() {
time.Sleep(100 * time.Millisecond) // 模拟任务
fmt.Println("完成任务:", num)
})
}

// 等所有任务做完
pool.Release() // 关闭池子并等待任务完成

fmt.Println("耗时:", time.Since(start))
}

进阶

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"fmt"
"net/http"
"time"

"github.com/panjf2000/ants/v2"
)

func main() {

urls := []string{}
for i := 0; i < 100; i++ {
urls = append(urls, "https://www.baidu.com")
}

client := &http.Client{Timeout: 2 * time.Second}

pool, _ := ants.NewPool(20)
defer pool.Release()

begin := time.Now()

for _, u := range urls {
link := u
pool.Submit(func() {
resp, err := client.Get(link)
if err == nil {
resp.Body.Close()
}
fmt.Println("✅ 完成:", link)
})
}

pool.Release() // 等任务全部结束
fmt.Println("总耗时:", time.Since(begin))
}

如何return

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

package main

import (
"fmt"
"net/http"
"sync"
"time"

"github.com/panjf2000/ants/v2"
)

type Result struct {
Url string
Ok bool
Cost time.Duration
}

func main() {

urls := []string{
"https://www.baidu.com",
"https://www.google.com",
"https://www.bilibili.com",
"https://www.qq.com",
}

client := &http.Client{Timeout: 2 * time.Second}

results := make([]Result, len(urls))
var wg sync.WaitGroup
wg.Add(len(urls))

pool, _ := ants.NewPool(20)
defer pool.Release()

for i, u := range urls {

idx := i
urlCopy := u

pool.Submit(func() {
defer wg.Done()

t := time.Now()
resp, err := client.Get(urlCopy)
ok := false
if err == nil {
resp.Body.Close()
ok = true
}

results[idx] = Result{
Url: urlCopy,
Ok: ok,
Cost: time.Since(t),
}
})
}

wg.Wait() // 等全部完成

fmt.Println("执行结果↓")

for _, r := range results {
fmt.Println(r)
}
}