Mark blog

知行合一 划水归档

Go 语言简介(三)

goroutine

GoRoutine主要是使用go关键字来调用函数,还可以使用匿名函数,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main
import "fmt"

func f(msg string) {
fmt.Println(msg)
}

func main(){
go f("goroutine")

go func(msg string) {
fmt.Println(msg)
}("going")
}

我们再来看一个示例,下面的代码中包括很多内容,包括时间处理,随机数处理,还有goroutine的代码。如果你熟悉C语言,你应该会很容易理解下面的代码。

你可以简单的把go关键字调用的函数想像成pthread_create。下面的代码使用for循环创建了3个线程,每个线程使用一个随机的Sleep时间,然后在routine()函数中会输出一些线程执行的时间信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import "fmt"
import "time"
import "math/rand"

func routine(name string, delay time.Duration) {

t0 := time.Now()
fmt.Println(name, " start at ", t0)

time.Sleep(delay)

t1 := time.Now()
fmt.Println(name, " end at ", t1)

fmt.Println(name, " lasted ", t1.Sub(t0))
}

func main() {

//生成随机种子
rand.Seed(time.Now().Unix())

var name string
for i:=0; i<3; i++{
name = fmt.Sprintf("go_%02d", i) //生成ID
//生成随机等待时间,从0-4秒
go routine(name, time.Duration(rand.Intn(5)) * time.Second)
}

//让主进程停住,不然主进程退了,goroutine也就退了
var input string
fmt.Scanln(&input)
fmt.Println("done")
}

运行的结果可能是:

1
2
3
4
5
6
7
8
9
go_00  start at  2012-11-04 19:46:35.8974894 +0800 +0800
go_01 start at 2012-11-04 19:46:35.8974894 +0800 +0800
go_02 start at 2012-11-04 19:46:35.8974894 +0800 +0800
go_01 end at 2012-11-04 19:46:36.8975894 +0800 +0800
go_01 lasted 1.0001s
go_02 end at 2012-11-04 19:46:38.8987895 +0800 +0800
go_02 lasted 3.0013001s
go_00 end at 2012-11-04 19:46:39.8978894 +0800 +0800
go_00 lasted 4.0004s

goroutine的并发安全性

关于goroutine,我试了一下,无论是Windows还是Linux,基本上来说是用操作系统的线程来实现的。不过,goroutine有个特性,也就是说,如果一个goroutine没有被阻塞,那么别的goroutine就不会得到执行。这并不是真正的并发,如果你要真正的并发,你需要在你的main函数的第一行加上下面的这段代码:

1
2
3
import "runtime"
...
runtime.GOMAXPROCS(4)

还是让我们来看一个有并发安全性问题的示例(注意:我使用了C的方式来写这段Go的程序)

这是一个经常出现在教科书里卖票的例子,我启了5个goroutine来卖票,卖票的函数sell_tickets很简单,就是随机的sleep一下,然后对全局变量total_tickets作减一操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import "fmt"
import "time"
import "math/rand"
import "runtime"

var total_tickets int32 = 10;

func sell_tickets(i int){
for{
if total_tickets > 0 { //如果有票就卖
time.Sleep( time.Duration(rand.Intn(5)) * time.Millisecond)
total_tickets-- //卖一张票
fmt.Println("id:", i, " ticket:", total_tickets)
}else{
break
}
}
}

func main() {
runtime.GOMAXPROCS(4) //我的电脑是4核处理器,所以我设置了4
rand.Seed(time.Now().Unix()) //生成随机种子

for i := 0; i < 5; i++ { //并发5个goroutine来卖票
go sell_tickets(i)
}
//等待线程执行完
var input string
fmt.Scanln(&input)
fmt.Println(total_tickets, "done") //退出时打印还有多少票
}

这个程序毋庸置疑有并发安全性问题,所以执行起来你会看到下面的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$go run sell_tickets.go
id: 0 ticket: 9
id: 0 ticket: 8
id: 4 ticket: 7
id: 1 ticket: 6
id: 3 ticket: 5
id: 0 ticket: 4
id: 3 ticket: 3
id: 2 ticket: 2
id: 0 ticket: 1
id: 3 ticket: 0
id: 1 ticket: -1
id: 4 ticket: -2
id: 2 ticket: -3
id: 0 ticket: -4
-4 done

可见,我们需要使用上锁,我们可以使用互斥量来解决这个问题。下面的代码,我只列出了修改过的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main
import "fmt"
import "time"
import "math/rand"
import "sync"
import "runtime"

var total_tickets int32 = 10;
var mutex = &sync.Mutex{} //可简写成:var mutex sync.Mutex

func sell_tickets(i int){
for total_tickets>0 {
mutex.Lock()
if total_tickets > 0 {
time.Sleep( time.Duration(rand.Intn(5)) * time.Millisecond)
total_tickets--
fmt.Println(i, total_tickets)
}
mutex.Unlock()
}
}
.......
......

原子操作

说到并发就需要说说原子操作在这里就举一个很简单的示例:下面的程序有10个goroutine,每个会对cnt变量累加20次,所以,最后的cnt应该是200。如果没有atomic的原子操作,那么cnt将有可能得到一个小于200的数。

下面使用了atomic操作,所以是安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import "fmt"
import "time"
import "sync/atomic"

func main() {
var cnt uint32 = 0
for i := 0; i < 10; i++ {
go func() {
for i:=0; i<20; i++ {
time.Sleep(time.Millisecond)
atomic.AddUint32(&cnt, 1)
}
}()
}
time.Sleep(time.Second)//等一秒钟等goroutine完成
cntFinal := atomic.LoadUint32(&cnt)//取数据
fmt.Println("cnt:", cntFinal)
}

这样的函数还有很多,参看go的atomic包文档

Channel信道

Channal是什么?Channal就是用来通信的,就像Unix下的管道一样,在Go中是这样使用Channel的。

下面的程序演示了一个goroutine和主程序通信的例程。这个程序足够简单了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

import "fmt"

func main() {
//创建一个string类型的channel
channel := make(chan string)

//创建一个goroutine向channel里发一个字符串
go func() { channel <- "hello" }()

msg := <- channel
fmt.Println(msg)
}

指定channel的buffer

指定buffer的大小很简单,看下面的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main
import "fmt"

func main() {
channel := make(chan string, 2)

go func() {
channel <- "hello"
channel <- "World"
}()

msg1 := <-channel
msg2 := <-channel
fmt.Println(msg1, msg2)
}

Channel的阻塞

注意,channel默认上是阻塞的,也就是说,如果Channel满了,就阻塞写,如果Channel空了,就阻塞读。于是,我们就可以使用这种特性来同步我们的发送和接收端。

下面这个例程说明了这一点,代码有点乱,不过我觉得不难理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import "fmt"
import "time"

func main() {

channel := make(chan string) //注意: buffer为1

go func() {
channel <- "hello"
fmt.Println("write \"hello\" done!")

channel <- "World" //Reader在Sleep,这里在阻塞
fmt.Println("write \"World\" done!")

fmt.Println("Write go sleep...")
time.Sleep(3*time.Second)
channel <- "channel"
fmt.Println("write \"channel\" done!")
}()

time.Sleep(2*time.Second)
fmt.Println("Reader Wake up...")

msg := <-channel
fmt.Println("Reader: ", msg)

msg = <-channel
fmt.Println("Reader: ", msg)

msg = <-channel //Writer在Sleep,这里在阻塞
fmt.Println("Reader: ", msg)
}

上面的代码输出的结果如下:

1
2
3
4
5
6
7
8
Reader Wake up...
Reader: hello
write "hello" done!
write "World" done!
Write go sleep...
Reader: World
write "channel" done!
Reader: channel

Channel阻塞的这个特性还有一个好处是,可以让我们的goroutine在运行的一开始就阻塞在从某个channel领任务,这样就可以作成一个类似于线程池一样的东西。

多个Channel的select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main
import "time"
import "fmt"

func main() {
//创建两个channel - c1 c2
c1 := make(chan string)
c2 := make(chan string)

//创建两个goruntine来分别向这两个channel发送数据
go func() {
time.Sleep(time.Second * 1)
c1 <- "Hello"
}()
go func() {
time.Sleep(time.Second * 1)
c2 <- "World"
}()

//使用select来侦听两个channel
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
}
}

注意:上面的select是阻塞的,所以,才搞出ugly的for i <2这种东西

Channel select阻塞的Timeout

解决上述那个for循环的问题,一般有两种方法:一种是阻塞但有timeout,一种是无阻塞。我们来看看如果给select设置上timeout的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for {
timeout_cnt := 0
select {
case msg1 := <-c1:
fmt.Println("msg1 received", msg1)
case msg2 := <-c2:
fmt.Println("msg2 received", msg2)
case <-time.After(time.Second * 30):
fmt.Println("Time Out")
timout_cnt++
}
if time_cnt > 3 {
break
}
}

上面代码中高亮的代码主要是用来让select返回的,注意 case中的time.After事件。

Channel的无阻塞

好,我们再来看看无阻塞的channel,其实也很简单,就是在select中加入default,如下所示:

1
2
3
4
5
6
7
8
9
10
11
for {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
default: //default会导致无阻塞
fmt.Println("nothing received!")
time.Sleep(time.Second)
}
}

Channel的关闭

关闭Channel可以通知对方内容发送完了,不用再等了。参看下面的例程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import "fmt"
import "time"
import "math/rand"

func main() {

channel := make(chan string)
rand.Seed(time.Now().Unix())

//向channel发送随机个数的message
go func () {
cnt := rand.Intn(10)
fmt.Println("message cnt :", cnt)
for i:=0; i<cnt; i++{
channel <- fmt.Sprintf("message-%2d", i)
}
close(channel) //关闭Channel
}()

var more bool = true
var msg string
for more {
select{
//channel会返回两个值,一个是内容,一个是还有没有内容
case msg, more = <- channel:
if more {
fmt.Println(msg)
}else{
fmt.Println("channel closed!")
}
}
}
}

参考自:

GO 语言简介(下)— 特性

https://coolshell.cn/articles/8489.html