Go 并发:Channel 通道

在上一篇文章《Go 并发机制:Goroutine》中,我们讨论了 Go 如何使用 goroutine 实现并发。本文我们继续讨论 goroutine 如何利用 channel 来进行通信。

什么是 channel

Channel,通道,可看作是 goroutine 进行通信的管道(pipe)。跟水可以在管道中从一端流向另一端类似,数据也可以通过通道从一端发送,从另一端接收。

声明 channel

每一个通道都有一个具体的类型,即这个通道允许传输的数据类型。

1
ch T

一个类型为 T 的通道。

通道的零值为 nil,可以使用 make 来定义通道。例如:

1
2
3
4
5
6
7
8
9
10
11
12
package main

import "fmt"

func main() {
var a chan int
if a == nil {
fmt.Println("channel a is nil, going to define it")
a = make(chan int)
fmt.Printf("Type of a is %T", a)
}
}

上面的程序中,首先声明了 a 通道,且a 通道的类型为 int。由于 a 通道开始的值为 nil,接下来进行通道的定义。

运行程序,输出:

1
2
channel a is nil, going to define it
Type of a is chan int

通常来说,也可以使用短变量声明来定义一个通道:

1
a := make(chan int)  

上面的代码,同样定义了一个类型为 int 的通道。

通过 channel 收发数据

可以通过通道收发数据,例如:

1
2
data := <- a // 从通道 a 接收数据 
a <- data // 往通道 a 发送数据

通过通道发送和接收数据都使用 <- 操作符。第一行中,箭头指向变量 data,表示从通道中接收数据,并保存至变量 data 中。
第二行中,箭头指向通道 a,表示往通道 a 发送数据。

默认情况下,从通道接收数据和往通道发送数据都会阻塞。对于发送方来说,发送数据将会一直阻塞,直到另一个 goroutine 从通道读取数据。同理,对于接收方来说,接收数据将会一直阻塞,直到另一个 goroutine 往通道发送数据。
通道的这种特性有利于 goroutine 在不显式使用锁或者条件变量的情况进行有效的通信。

例子

我们继续以上一篇文章《Go 并发机制:Goroutine》中的程序为例,改用通道来实现。

为便于对照,这里附上原程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
"fmt"
"time"
)

func hello() {
fmt.Println("Hello world goroutine")
}

func main() {
go hello()
time.Sleep(1 * time.Second)
fmt.Println("main function")
}

为避免 main goroutine 没有等待 hello goroutine 执行完毕就直接退出,main goroutine 使用了 Sleep 睡眠以等待 1 秒时间。如何改用通道更优雅地实现类似的功能呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import (
"fmt"
)

func hello(done chan bool) {
fmt.Println("Hello world goroutine")
done <- true
}

func main() {
done := make(chan bool)
go hello(done)
<-done
fmt.Println("main function")
}

在 main goroutine 中,我们创建了 done bool 类型的通道,然后作为参数传递给 hello goroutine,接下来,从 done 通道中接收数据,这时 main goroutine 会阻塞。
<-done 表示从 done 通道接收数据,但并不会将接收的数据保存到其他变量中。在 go 中,这是合法的。

对于 hello goroutine 来说,先打印出 Hello world goroutine ,然后将数据发送至 done 通道。当 hello goroutine 数据发送完毕,main goroutine 从 done 通道接收到数据,main goroutine 不再阻塞,最后打印出 main function

程序输出:

1
2
Hello world goroutine
main function

更复杂的例子

接下来实现个更复杂点的程序,该程序接受一个数字作为输入,例如,123,然后计算该数字包含的数字的平方和立方和:

平方和 = (1 * 1)+(2 * 2)+(3 * 3)= 14
立方和 = (1 * 1 * 1)+(2 * 2 * 2)+(3 * 3 * 3)= 36
输出 = 平方和 + 立方和 = 50

我们使用一个 goroutine 来计算平方和,使用另一个 goroutine 来计算立方和,最后在 main goroutine 求和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
)

func calcSquares(number int, squareop chan int) {
sum := 0
for number != 0 {
digit := number % 10
sum += digit * digit
number /= 10
}
squareop <- sum
}

func calcCubes(number int, cubeop chan int) {
sum := 0
for number != 0 {
digit := number % 10
sum += digit * digit * digit
number /= 10
}
cubeop <- sum
}

func main() {
number := 589
sqrch := make(chan int)
cubech := make(chan int)
go calcSquares(number, sqrch)
go calcCubes(number, cubech)
squares, cubes := <-sqrch, <-cubech
fmt.Println("Final output", squares+cubes)
}

calcSquares 函数计算数字的平方和,calcCubes 函数计算数字的立方和,计算完毕,分别将结果发送至通道 squareopcubeop

calcSquares 函数和calcCubes 函数分别运行在不同的 goroutine,main goroutine 等待两个 goroutine 计算完毕,然后从通道接收计算结果,并将计算结果分别保存至变量 squarescubes
最后在 main goroutine 中输出求和结果。

运行程序,得到输出:

1
Final output 1536

死锁

使用通道时需要注意死锁的问题。

Goroutine 在往通道发送数据时,需要保证有其他的 goroutine 从该通道接收数据,否则,会发生死锁。
同理,goroutine 在从通道接收数据时,需要保证有其他的 goroutine 往该通道发送数据,否则,同样会发生死锁。

例如如下程序:

1
2
3
4
5
6
package main

func main() {
ch := make(chan int)
ch <- 5
}

该程序只是往通道 ch 发送数据,并没有其他 gorutine 接收数据。运行程序,会得到死锁报错:

1
2
3
4
5
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
/xxx.go:5 +0x31

单向 channel

上面我们讨论通道都是双向的,即既可以往通道发送数据,也可以从通道读取数据。
我们也可以创建单向的通道,即该通道只能发送数据或传输数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

import "fmt"

func sendData(sendch chan<- int) {
sendch <- 10
}

func main() {
sendch := make(chan<- int)
go sendData(sendch)
fmt.Println(<-sendch)
}

在上面的程序中,我们创建一个只允许发送数据的通道 sendchchan<- int 箭头指向 chan 表示通道只允许发送数据。

fmt.Println(<-sendch) 表示从通道 sendch 接收数据,可以看到编译器报错:

1
invalid operation: cannot receive from send-only channel sendch (variable of type chan<- int)

单向的通道有什么作用呢?

在 go 中,双向通道可以转化为单向通道,却不允许单向通道转发为双向通道。利用这个特性,改写上面的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

import "fmt"

func sendData(sendch chan<- int) {
sendch <- 10
}

func main() {
chnl := make(chan int)
go sendData(chnl)
fmt.Println(<-chnl)
}

上面的程序中,chnl := make(chan int) 创建了一个双向通道 chnl,通道 chnl 作为参数传递给函数 sendData。函数 sendData 的将参数转化为单向通道 sendch,这样在函数 sendData 中,只允许往通道 sendch 发送数据。但在 main 函数中,仍然可以从通道接收数据。最终程序的输出为 10

关闭 channel

发送者可以对通道进行关闭,以通知接收者数据已发送完毕。接收者在接收数据时可以增加一个额外的变量来判断通道是否已关闭。

1
v, ok := <- ch  

变量 ok 用于判断通道是否已关闭。 当ok 值为 true 时,接收者可以正常从通道接收数据;当 ok 值为 false 时,表明通道已关闭,这时,从通道接收到的数据为零值。例如,如果通道为 int 类型,则从通道接收的数据为 0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
)

func producer(chnl chan int) {
for i := 0; i < 10; i++ {
chnl <- i
}
close(chnl)
}

func main() {
ch := make(chan int)
go producer(ch)
for {
v, ok := <-ch
if !ok {
break
}
fmt.Println("Received ", v, ok)
}
}

在上面的程序中,producer goroutine 向通道 chnl 发送数据 0 - 9,然后关闭通道。main goroutine 在循环中检测变量 ok 的值,如果 ok 为 false,则表明通道已关闭跳出循环,如果 ok 为 true,则打印出接收到的数字:

1
2
3
4
5
6
7
8
9
10
Received  0 true
Received 1 true
Received 2 true
Received 3 true
Received 4 true
Received 5 true
Received 6 true
Received 7 true
Received 8 true
Received 9 true

for range 同样可以用来从通道接收数据,如果通道被关闭,则会跳出循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
)

func producer(chnl chan int) {
for i := 0; i < 10; i++ {
chnl <- i
}
close(chnl)
}

func main() {
ch := make(chan int)
go producer(ch)
for v := range ch {
fmt.Println("Received ", v)
}
}

程序的输出结果:

1
2
3
4
5
6
7
8
9
10
Received  0
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9

我们采用 for range 循环形式来改写上面计算平方和和立方和的程序。

将提取各位上数字的逻辑独立出来形成一个函数 digits,函数 digits 提取各位数字后往通道发送。

函数 calcSquarescalcCubes 调用函数 digits 接收提取到的各位数字。函数 calcSquarescalcCubes 均使用了 for range 循环,当函数 digits 关闭通道后,会跳出 for range 循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"fmt"
)

func digits(number int, dchnl chan int) {
for number != 0 {
digit := number % 10
dchnl <- digit
number /= 10
}
close(dchnl)
}

func calcSquares(number int, squareop chan int) {
sum := 0
dch := make(chan int)
go digits(number, dch)
for digit := range dch {
sum += digit * digit
}
squareop <- sum
}

func calcCubes(number int, cubeop chan int) {
sum := 0
dch := make(chan int)
go digits(number, dch)
for digit := range dch {
sum += digit * digit * digit
}
cubeop <- sum
}

func main() {
number := 589
sqrch := make(chan int)
cubech := make(chan int)
go calcSquares(number, sqrch)
go calcCubes(number, cubech)
squares, cubes := <-sqrch, <-cubech
fmt.Println("Final output", squares+cubes)
}

同样地,程序输出:

1
Final output 1536

在下一篇文章中,我们将会继续讨论带缓冲的通道的使用。

参考资料