缓冲通道和工作池
Q7nl1s admin

什么是缓冲通道?

我们在之前的教程中讨论的所有通道基本上都是无缓冲的。正如我们在通道教程中详细讨论的那样,向无缓冲通道的发送和接收是阻塞的。

我们可以创建一个有缓冲区的通道。只有当缓冲区满了的时候,向缓冲区的发送才会被阻止。同样,从一个缓冲通道的接收只有在缓冲区为空时才会被阻止。

缓冲通道可以通过向make函数传递一个额外的容量参数来创建,该参数指定了缓冲区的大小。

1
ch := make(chan type, capacity)  

在上面的语法中,容量应该大于0,通道才有缓冲区。一个没有缓冲的通道的容量默认为0,因此我们在前面的教程中创建通道时省略了容量参数。

让我们写一些代码,创建一个有缓冲的通道。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

import (
"fmt"
)


func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
fmt.Println(<- ch)
fmt.Println(<- ch)
}

在上面的程序中,在第9行,我们创建了一个缓冲通道。由于该通道的容量为2,所以可以向该通道写入2个字符串而不被阻塞。我们在第10行和第11行向通道写了2个字符串,通道没有阻塞。我们在第12行和第13行分别读取所写的2个字符串。这个程序会打印:

1
2
naveen  
paul

另一个例子

让我们再看一个缓冲通道的例子,在这个例子中,通道的值是在一个并发的Goroutine中写入的,并从主Goroutine中读取。这个例子将帮助我们更好地理解什么时候写到缓冲通道块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"time"
)

func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("successfully wrote", i, "to ch")
}
close(ch)
}
func main() {
ch := make(chan int, 2)
go write(ch)
time.Sleep(2 * time.Second)
for v := range ch {
fmt.Println("read value", v,"from ch")
time.Sleep(2 * time.Second)

}
}

在上面的程序中,在main Goroutine的第16行创建了一个容量为2的缓冲通道,并在第17行传递给write Goroutine,然后main Goroutine休眠2秒。在这段时间里,写Goroutine同时在运行。write Goroutine有一个for循环,将0到4的数字写到ch通道中。这个缓冲通道的容量是2,因此write Goroutine将能够立即向ch通道写入0和1的值,然后它阻塞,直到至少有一个值从ch通道被读出。因此,这个程序将立即打印以下2行。

1
2
successfully wrote 0 to ch  
successfully wrote 1 to ch

在打印完上面两行后,在write Goroutine中对ch通道的写入被阻止了,直到有人从ch通道中读出。由于main Goroutine在开始从通道中读出之前会sleep 2秒,所以程序在接下来的2秒内不会打印任何东西。main Goroutine在2秒后醒来,在第19行使用for range循环开始读取ch通道的数据。打印读出的值,然后再sleep 2秒,这个循环一直持续到ch被关闭。因此,程序将在2秒后打印以下几行。

1
2
read value 0 from ch  
successfully wrote 2 to ch

这将继续下去,直到所有的值都被写入通道,并且在write Goroutine中被关闭。最后的输出:

1
2
3
4
5
6
7
8
9
10
successfully wrote 0 to ch  
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch

Deadlock(死锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

import (
"fmt"
)

func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
ch <- "steve"
fmt.Println(<-ch)
fmt.Println(<-ch)
}

在上面的程序中,我们写了3个字符串到一个容量为2的缓冲通道。当control到达第11行的第三个写入点时。由于该通道已经达到了它可以容纳数据的上线(也就是其的容量),所以写入被阻止了。现在,一些Goroutine必须从该通道中读取,以便继续写,但在这种情况下,没有并发的例程从该通道中读取。因此,将出现一个死锁,程序在运行时将会出现以下的报错信息。

1
2
3
4
5
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
/tmp/sandbox091448810/prog.go:11 +0x8d

关闭缓冲通道

我们在前面的教程中已经讨论了关闭通道的问题。除了在前面的教程中所学到的,在关闭缓冲通道时,还有一个微妙的问题需要考虑。

从一个已经关闭的缓冲通道中读取数据是可能的。该通道将返回已经写入该通道的数据,一旦所有的数据都被读取,它将返回该通道的零值。

让我们写一个程序来理解这个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
)

func main() {
ch := make(chan int, 5)
ch <- 5
ch <- 6
close(ch)
n, open := <-ch
fmt.Printf("Received: %d, open: %t\n", n, open)
n, open = <-ch
fmt.Printf("Received: %d, open: %t\n", n, open)
n, open = <-ch
fmt.Printf("Received: %d, open: %t\n", n, open)
}

在上面的程序中,我们在第8行创建了一个容量为5的缓冲通道。然后我们把56写到通道中。之后在第8行关闭该通道。即使通道被关闭,我们也可以读取已经写入通道的值。这在第12和14行中完成。在第12行中,n的值为5opentrue。在第14行中n的值将是6open将再次为true。现在我们已经完成了从通道中读取56的工作,没有更多的数据需要读取。现在,当我们在第16行再次读取通道时,n的值将是0,也就是int的零值,open将是false,表明通道已经关闭。

这个程序将打印:

1
2
3
Received: 5, open: true  
Received: 6, open: true
Received: 0, open: false

同样的程序也可以用for range循环来写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
"fmt"
)

func main() {
ch := make(chan int, 5)
ch <- 5
ch <- 6
close(ch)
for n := range ch {
fmt.Println("Received:", n)
}
}

上述程序第12行中的for range循环将读取所有写入通道的值,一旦没有更多的值可以读取,就会退出,因为通道已经关闭。

这个程序将打印:

1
2
Received: 5  
Received: 6

长度与容量

缓冲通道的容量是该通道可以容纳的数值的数量。这是我们在使用make函数创建缓冲通道时指定的数值。

缓冲通道的长度是当前排在其中的元素的数量。

一个程序将使事情变得更清楚 😀

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
"fmt"
)

func main() {
ch := make(chan string, 3)
ch <- "naveen"
ch <- "paul"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
}

在上面的程序中,创建的通道的容量是3,也就是说,它可以容纳3个字符串。然后我们在第9行和第10行分别向该通道写入2个字符串。现在该通道有2个字符串排队,因此其长度为2。在第13行,我们从通道中读取一个字符串。现在该通道只有一个字符串在排队,因此它的长度是1。这个程序将打印:

1
2
3
4
capacity is 3  
length is 2
read value naveen
new length is 1

WaitGroup(等待组)

本教程的下一节是关于工作池的。为了理解工作池,我们首先需要了解WaitGroup,因为它将在工作池的实现中使用。

一个WaitGroup用于等待Goroutines的集合完成执行。控制被阻断,直到所有的Goroutine完成执行。比方说,我们有3个同时执行的Goroutine,都是从main Goroutine生成的。main Goroutine需要等待其他3个Goroutine完成后再终止。这可以通过WaitGroup来实现。

让我们停止理论,马上写些代码吧 😀

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"sync"
"time"
)

func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}

func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}

WaitGroup是一个结构类型,我们在第18行创建一个WaitGroup类型的零值变量。WaitGroup的工作方式是通过使用一个计数器。当我们在WaitGroup上调用Add并传递给它一个int时,WaitGroup的计数器会被传递给Add的值所增加。减少计数器的方法是通过调用WaitGroup上的Done()方法。Wait()方法阻断被调用的Goroutine,直到计数器变为零。

在上面的程序中,我们在for循环中的第20行调用了wg.Add(1),该循环迭代了3次。所以计数器现在变成了3。for循环也产生了3个进程Goroutine,然后在第23行调用wg.Wait()使main Goroutine等待,直到计数器变为0。该计数器被第13行的进程Goroutine中的wg.Done()调用而减去。 一旦所有3个生成的Goroutine完成它们的执行,也就是一旦wg.Done()被调用3次,计数器将变成0,main Goroutine将被解锁。

在第21行传递wg的指针是很重要的。如果没有传递指针,那么每个Goroutine都会有自己的WaitGroup副本,当它们执行完毕时,main将不会得到通知。

这个程序的输出。

1
2
3
4
5
6
7
started Goroutine  2
started Goroutine 0
started Goroutine 1
Goroutine 1 ended
Goroutine 0 ended
Goroutine 2 ended
All go routines finished executing

你的输出可能与我的不同,因为Goroutine的执行顺序可能不同:)。

Worker Pool的实现

缓冲通道的一个重要用途是实现工人池

一般来说,工人池是一个线程的集合,它们正在等待任务被分配给它们。一旦他们完成了分配的任务,他们就会再次为下一个任务提供服务。

我们将使用缓冲通道实现一个工人池。我们的工人池将执行寻找输入数字的位数之和的任务。例如,如果传递234,输出将是9(2+3+4)。工人池的输入将是一个伪随机整数的列表。

以下是我们工人池的核心功能

  • 创建一个Goroutines池,在输入缓冲通道上监听,等待工作分配。
  • 将作业添加到输入缓冲通道中
  • 在作业完成后将结果写入输出缓冲通道
  • 从输出缓冲通道读取和打印结果

我们将一步一步地编写这个程序,使其更容易理解。

第一步将是创建代表工作和结果的结构。

1
2
3
4
5
6
7
8
type Job struct {  
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}

每个Job结构都有一个id和一个用于计算各个数字的总和的randomno

Result结构有一个job字段以及一个sumofdigits字段用于保存结果(各个数字之和)的作业。

下一步是创建缓冲通道,用于接收作业和写入输出。

1
2
var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)

Worker Goroutines在jobs缓冲通道上监听新的任务。一旦任务完成,其结果将被写入Result缓冲通道。

下面的digits函数做的实际工作是找出一个整数的各个数字之和并返回。我们将给这个函数添加一个2秒的sleep时间,只是为了模拟这个函数需要一些时间来计算结果的事实。

1
2
3
4
5
6
7
8
9
10
11
func digits(number int) int {  
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}

接下来,我们将写一个函数来创建一个worker Goroutine。

1
2
3
4
5
6
7
func worker(wg *sync.WaitGroup) {  
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}

上述函数创建了一个worker,它从jobs通道读取信息,使用当前作业和digits函数的返回值创建一个Result结构,然后将结果写入results缓冲通道。这个函数接收一个WaitGroup wg作为参数,当所有jobs都完成后,它将调用Done()方法。

createWorkerPool函数将创建一个Goroutines工作池。

1
2
3
4
5
6
7
8
9
func createWorkerPool(noOfWorkers int) {  
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}

上面的函数把要创建的workers数量作为一个参数。它在创建Goroutine之前调用wg.Add(1),以增加WaitGroup计数器。然后,它通过将WaitGroup wg的指针传递给worker函数来创建worker Goroutine。在创建完所需的Goroutine后,它通过调用wg.Wait()来等待所有的Goroutine完成它们的执行。在所有的Goroutines完成执行后,它关闭了results通道,因为所有的Goroutines都完成了执行,没有人再向results通道写东西。

现在我们已经准备好了工人池,让我们继续写函数,将工作分配给 workers

1
2
3
4
5
6
7
8
func allocate(noOfJobs int) {  
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}

上面的allocate函数把要创建的jobs数量作为输入参数,生成最大值为998的伪随机数,使用随机数和for循环计数器i作为id创建Job结构,然后把它们写到作业通道中。在写完所有jobs后,它关闭了jobs通道。

下一步将是创建读取results通道并打印输出的函数。

1
2
3
4
5
6
func result(done chan bool) {  
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}

result函数读取results通道并打印出 job ID、输入的随机数和随机数的位数之和。result函数还需要一个已完成的通道作为参数,一旦它打印了所有的结果,就会写入该通道。

我们现在已经设置好了一切。让我们继续完成最后一步,从main()函数中调用所有这些函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {  
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

我们首先在main函数的第2行存储程序的执行开始时间,在最后一行(第12行)我们计算endTimestartTime之间的时间差并显示程序运行的总时间。这是有必要的,因为我们将通过改变Goroutines的数量来做一些基准测试。

noOfJobs被设置为100,然后调用allocate将工作添加到jobs通道中。

然后创建done通道,并将其传递给result Goroutine,这样它就可以开始打印输出,并在所有内容都打印完毕后发出通知。

最后,通过调用createWorkerPool函数创建了一个由10worker Goroutine组成的池,然后maindone通道上等待所有的results被打印出来。

下面是完整的程序供你参考。我也导入了必要的软件包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

请在你的本地机器上运行这个程序,以便在计算总时间时更加准确。

这个程序会打印。

1
2
3
4
5
Job id 1, input random no 636, sum of digits 15  
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken 20.06589 seconds

总共100行将被打印出来,与100个工作相对应,最后一行将打印出程序运行的总时间。你的输出结果将与我的不同,因为Goroutines可以按任何顺序运行,总时间也将根据硬件的不同而不同。在我的例子中,程序完成大约需要20秒。

现在让我们把main函数中的noOfWorkers增加到20。我们已经将workers的数量增加了一倍。由于worker的Goroutines增加了(准确地说是增加了一倍),程序完成的总时间应该减少(准确地说是减少了一半)。在我的例子中,它变成了10.004364685秒,程序被打印出来。

1
2
...
total time taken 10.0249626 seconds

现在我们可以理解,随着worker Goroutine数量的增加,完成工作所需的总时间会减少。我留下一个练习,让你把main函数中的noOfJobsnoOfWorkers换成不同的值,并分析其结果。

这样我们就到了本教程的结尾。祝你有个愉快的一天。

 Comments
Comment plugin failed to load
Loading comment plugin
Powered by Hexo & Theme Keep
Unique Visitor Page View