什么是缓冲通道?
我们在之前的教程中讨论的所有通道基本上都是无缓冲的。正如我们在通道教程中详细讨论的那样,向无缓冲通道的发送和接收是阻塞的。
我们可以创建一个有缓冲区的通道。只有当缓冲区满了的时候,向缓冲区的发送才会被阻止。同样,从一个缓冲通道的接收只有在缓冲区为空时才会被阻止。
缓冲通道可以通过向make
函数传递一个额外的容量参数来创建,该参数指定了缓冲区的大小。
1 | ch := make(chan type, capacity) |
在上面的语法中,容量应该大于0,通道才有缓冲区。一个没有缓冲的通道的容量默认为0,因此我们在前面的教程中创建通道时省略了容量参数。
让我们写一些代码,创建一个有缓冲的通道。
例子
1 | package main |
在上面的程序中,在第9行,我们创建了一个缓冲通道。由于该通道的容量为2,所以可以向该通道写入2个字符串而不被阻塞。我们在第10行和第11行向通道写了2个字符串,通道没有阻塞。我们在第12行和第13行分别读取所写的2个字符串。这个程序会打印:
1 | naveen |
另一个例子
让我们再看一个缓冲通道的例子,在这个例子中,通道的值是在一个并发的Goroutine中写入的,并从主Goroutine中读取。这个例子将帮助我们更好地理解什么时候写到缓冲通道块。
1 | package main |
在上面的程序中,在main
Goroutine的第16行创建了一个容量为2
的缓冲通道,并在第17行传递给write
Goroutine,然后main
Goroutine休眠2秒。在这段时间里,写Goroutine同时在运行。write
Goroutine有一个for
循环,将0到4的数字写到ch
通道中。这个缓冲通道的容量是2
,因此write
Goroutine将能够立即向ch
通道写入0和1的值,然后它阻塞,直到至少有一个值从ch
通道被读出。因此,这个程序将立即打印以下2行。
1 | successfully wrote 0 to ch |
在打印完上面两行后,在write
Goroutine中对ch
通道的写入被阻止了,直到有人从ch
通道中读出。由于main
Goroutine在开始从通道中读出之前会sleep 2秒,所以程序在接下来的2秒内不会打印任何东西。main
Goroutine在2秒后醒来,在第19行使用for range
循环开始读取ch
通道的数据。打印读出的值,然后再sleep 2秒,这个循环一直持续到ch
被关闭。因此,程序将在2秒后打印以下几行。
1 | read value 0 from ch |
这将继续下去,直到所有的值都被写入通道,并且在write
Goroutine中被关闭。最后的输出:
1 | successfully wrote 0 to ch |
Deadlock(死锁)
1 | package main |
在上面的程序中,我们写了3个字符串到一个容量为2的缓冲通道。当control到达第11行的第三个写入点时。由于该通道已经达到了它可以容纳数据的上线(也就是其的容量),所以写入被阻止了。现在,一些Goroutine必须从该通道中读取,以便继续写,但在这种情况下,没有并发的例程从该通道中读取。因此,将出现一个死锁,程序在运行时将会出现以下的报错信息。
1 | fatal error: all goroutines are asleep - deadlock! |
关闭缓冲通道
我们在前面的教程中已经讨论了关闭通道的问题。除了在前面的教程中所学到的,在关闭缓冲通道时,还有一个微妙的问题需要考虑。
从一个已经关闭的缓冲通道中读取数据是可能的。该通道将返回已经写入该通道的数据,一旦所有的数据都被读取,它将返回该通道的零值。
让我们写一个程序来理解这个问题。
1 | package main |
在上面的程序中,我们在第8行创建了一个容量为5
的缓冲通道。然后我们把5
和6
写到通道中。之后在第8行关闭该通道。即使通道被关闭,我们也可以读取已经写入通道的值。这在第12和14行中完成。在第12行中,n
的值为5
,open
为true
。在第14行中n
的值将是6
,open
将再次为true
。现在我们已经完成了从通道中读取5
和6
的工作,没有更多的数据需要读取。现在,当我们在第16行再次读取通道时,n
的值将是0
,也就是int
的零值,open
将是false
,表明通道已经关闭。
这个程序将打印:
1 | Received: 5, open: true |
同样的程序也可以用for range循环来写。
1 | package main |
上述程序第12行中的for range
循环将读取所有写入通道的值,一旦没有更多的值可以读取,就会退出,因为通道已经关闭。
这个程序将打印:
1 | Received: 5 |
长度与容量
缓冲通道的容量是该通道可以容纳的数值的数量。这是我们在使用make
函数创建缓冲通道时指定的数值。
缓冲通道的长度是当前排在其中的元素的数量。
一个程序将使事情变得更清楚 😀
1 | package main |
在上面的程序中,创建的通道的容量是3
,也就是说,它可以容纳3个字符串。然后我们在第9行和第10行分别向该通道写入2个字符串。现在该通道有2个字符串排队,因此其长度为2。在第13行,我们从通道中读取一个字符串。现在该通道只有一个字符串在排队,因此它的长度是1。这个程序将打印:
1 | capacity is 3 |
WaitGroup(等待组)
本教程的下一节是关于工作池的。为了理解工作池,我们首先需要了解WaitGroup
,因为它将在工作池的实现中使用。
一个WaitGroup
用于等待Goroutines的集合完成执行。控制被阻断,直到所有的Goroutine完成执行。比方说,我们有3个同时执行的Goroutine,都是从main
Goroutine生成的。main
Goroutine需要等待其他3个Goroutine完成后再终止。这可以通过WaitGroup
来实现。
让我们停止理论,马上写些代码吧 😀
1 | package main |
WaitGroup
是一个结构类型,我们在第18行创建一个WaitGroup
类型的零值变量。WaitGroup
的工作方式是通过使用一个计数器。当我们在WaitGroup
上调用Add
并传递给它一个int
时,WaitGroup
的计数器会被传递给Add
的值所增加。减少计数器的方法是通过调用WaitGroup
上的Done()
方法。Wait()
方法阻断被调用的Goroutine,直到计数器变为零。
在上面的程序中,我们在for循环中的第20行调用了wg.Add(1)
,该循环迭代了3次。所以计数器现在变成了3。for
循环也产生了3个进程Goroutine,然后在第23行调用wg.Wait()
使main
Goroutine等待,直到计数器变为0。该计数器被第13行的进程Goroutine中的wg.Done()
调用而减去。 一旦所有3个生成的Goroutine完成它们的执行,也就是一旦wg.Done()
被调用3次,计数器将变成0,main
Goroutine将被解锁。
在第21行传递wg
的指针是很重要的。如果没有传递指针,那么每个Goroutine都会有自己的WaitGroup
副本,当它们执行完毕时,main
将不会得到通知。
这个程序的输出。
1 | started Goroutine 2 |
你的输出可能与我的不同,因为Goroutine的执行顺序可能不同:)。
Worker Pool的实现
缓冲通道的一个重要用途是实现工人池。
一般来说,工人池是一个线程的集合,它们正在等待任务被分配给它们。一旦他们完成了分配的任务,他们就会再次为下一个任务提供服务。
我们将使用缓冲通道实现一个工人池。我们的工人池将执行寻找输入数字的位数之和的任务。例如,如果传递234,输出将是9(2+3+4)。工人池的输入将是一个伪随机整数的列表。
以下是我们工人池的核心功能
- 创建一个Goroutines池,在输入缓冲通道上监听,等待工作分配。
- 将作业添加到输入缓冲通道中
- 在作业完成后将结果写入输出缓冲通道
- 从输出缓冲通道读取和打印结果
我们将一步一步地编写这个程序,使其更容易理解。
第一步将是创建代表工作和结果的结构。
1 | type Job struct { |
每个Job
结构都有一个id
和一个用于计算各个数字的总和的randomno
。
Result
结构有一个job
字段以及一个sumofdigits
字段用于保存结果(各个数字之和)的作业。
下一步是创建缓冲通道,用于接收作业和写入输出。
1 | var jobs = make(chan Job, 10) |
Worker
Goroutines在jobs
缓冲通道上监听新的任务。一旦任务完成,其结果将被写入Result
缓冲通道。
下面的digits
函数做的实际工作是找出一个整数的各个数字之和并返回。我们将给这个函数添加一个2秒的sleep时间,只是为了模拟这个函数需要一些时间来计算结果的事实。
1 | func digits(number int) int { |
接下来,我们将写一个函数来创建一个worker
Goroutine。
1 | func worker(wg *sync.WaitGroup) { |
上述函数创建了一个worker
,它从jobs
通道读取信息,使用当前作业和digits
函数的返回值创建一个Result
结构,然后将结果写入results
缓冲通道。这个函数接收一个WaitGroup wg
作为参数,当所有jobs
都完成后,它将调用Done()
方法。
createWorkerPool
函数将创建一个Goroutines工作池。
1 | func createWorkerPool(noOfWorkers int) { |
上面的函数把要创建的workers
数量作为一个参数。它在创建Goroutine之前调用wg.Add(1)
,以增加WaitGroup计数器。然后,它通过将WaitGroup wg
的指针传递给worker
函数来创建worker
Goroutine。在创建完所需的Goroutine后,它通过调用wg.Wait()
来等待所有的Goroutine完成它们的执行。在所有的Goroutines完成执行后,它关闭了results
通道,因为所有的Goroutines都完成了执行,没有人再向results
通道写东西。
现在我们已经准备好了工人池,让我们继续写函数,将工作分配给 workers
。
1 | func allocate(noOfJobs int) { |
上面的allocate
函数把要创建的jobs
数量作为输入参数,生成最大值为998
的伪随机数,使用随机数和for循环计数器i
作为id
创建Job
结构,然后把它们写到作业通道中。在写完所有jobs
后,它关闭了jobs
通道。
下一步将是创建读取results
通道并打印输出的函数。
1 | func result(done chan bool) { |
result
函数读取results
通道并打印出 job ID、输入的随机数和随机数的位数之和。result
函数还需要一个已完成的通道作为参数,一旦它打印了所有的结果,就会写入该通道。
我们现在已经设置好了一切。让我们继续完成最后一步,从main()
函数中调用所有这些函数。
1 | func main() { |
我们首先在main
函数的第2行存储程序的执行开始时间,在最后一行(第12行)我们计算endTime
和startTime
之间的时间差并显示程序运行的总时间。这是有必要的,因为我们将通过改变Goroutines的数量来做一些基准测试。
noOfJobs
被设置为100,然后调用allocate
将工作添加到jobs
通道中。
然后创建done
通道,并将其传递给result
Goroutine,这样它就可以开始打印输出,并在所有内容都打印完毕后发出通知。
最后,通过调用createWorkerPool
函数创建了一个由10
个worker
Goroutine组成的池,然后main
在done
通道上等待所有的results被打印出来。
下面是完整的程序供你参考。我也导入了必要的软件包。
1 | package main |
请在你的本地机器上运行这个程序,以便在计算总时间时更加准确。
这个程序会打印。
1 | Job id 1, input random no 636, sum of digits 15 |
总共100行将被打印出来,与100个工作相对应,最后一行将打印出程序运行的总时间。你的输出结果将与我的不同,因为Goroutines可以按任何顺序运行,总时间也将根据硬件的不同而不同。在我的例子中,程序完成大约需要20秒。
现在让我们把main
函数中的noOfWorkers
增加到20
。我们已经将workers
的数量增加了一倍。由于worker
的Goroutines增加了(准确地说是增加了一倍),程序完成的总时间应该减少(准确地说是减少了一半)。在我的例子中,它变成了10.004364685秒,程序被打印出来。
1 | ... |
现在我们可以理解,随着worker
Goroutine数量的增加,完成工作所需的总时间会减少。我留下一个练习,让你把main
函数中的noOfJobs
和noOfWorkers
换成不同的值,并分析其结果。
这样我们就到了本教程的结尾。祝你有个愉快的一天。