首页 > Go 分布式学习利器(20)-- Go并发编程之多路选择和超时控制,channel的关闭和广播

Go 分布式学习利器(20)-- Go并发编程之多路选择和超时控制,channel的关闭和广播

Select 多路选择

基本使用语法如下:

select { 
case ret := <-retCh1: //阻塞事件,等待channel1的消息t.Logf("result %s 
",ret)
case ret := <-retCh2:t.Logf("result %s 
", rest)
default :t.Error("return empty")

关于channel部分其实是阻塞的,也就是select实际执行的过程中会阻塞在对应的channel部分,直到某一个case对应的channel有有效的数据才会执行该case下的逻辑。

实际程序执行的过程中 如果出现两个channel同时有有效数据,那两个case内部的执行顺序是无法严格保证的,只能由程序员自己来控制。

Select 超时控制

同样如上代码,我们要控制channel获取数据的时间,防止channel中等待有效数据时间过长,所以可以增加一些超时控制:

select { 
case ret := <-retCh1: //阻塞事件,等待channel1的消息t.Logf("result %s 
",ret)// 等待1s 返回一个channel的有效数据,且第一个case还未得到有效数据,则输出超时
case <- time.After(time.Second * 1): t.Error("time out")

所以 select 可以用于保证多个协程之间的高可用,防止slow response的出现。

以上两种select多路选择用法 的 测试代码如下:

package select_testimport ("fmt""testing""time"
)
func service() string { time.Sleep(time.Millisecond * 400)return "Service1 is Done"
}func AsyncService(i int) chan string { rech := make(chan string,1) // 声明一个channelvar ret stringgo func() { if i == 1 { ret = service()} else { ret = service1()}fmt.Println("resturned result")rech <- ret // 向 channel 中放数据fmt.Println("service exited")}()return rech // 返回channel
}// 测试超时机制来避免等待channel时间过长
// ret 这个channel需要等待AsyncService 函数中的routine中的
// service函数返回结果,才会将数据填充到ret 中
// service需要执行400ms,所以这里会出现超时的情况
func TestSelectTimeout(t *testing.T) { select { case ret := <-AsyncService(1):t.Logf("result is %s", ret)case <- time.After(time.Millisecond * 100):t.Error("time out")}
}func service1() string { time.Sleep(time.Millisecond * 500)return "Service2 is Done"
}// 测试多个channel返回数据,挑选其中一个先准备好的channel来执行
func TestSelect(t *testing.T) { select { case ret1 := <- AsyncService(1):t.Logf("result is %s",ret1)case ret2 := <- AsyncService(2):t.Logf("result is %s", ret2)case <- time.After(time.Millisecond * 600):t.Log("Time out")}
}

channel 的关闭和广播

channel 可以说是Go语言中 协程之间通信的一种机制,支持带buffer和不带buffer两种模式,非常方便得实现不同协程之间的通信过程,但是在具体的通信过程中也会暴露一些问题,如下生产者,消费者代码:

package close_channelimport ("fmt""sync""testing"
)// 数据生产者
func dataProducer(ch chan int, wg *sync.WaitGroup) { go func() { for i := 0; i < 10; i ++ { ch <- i}wg.Done()}()
}// 数据消费者
func dataComsumer(ch chan int, wg *sync.WaitGroup) { go func() { // 没有办法准确知道channel中什么时候没有数据,这里保持和生产者相同的填充数据的次数for i := 0;i < 10; i++ {  data := <-chfmt.Printf("consumer data %d
",data)}wg.Done()}()
}func TestProducer(t *testing.T) { var wg sync.WaitGroup // wait groupch := make(chan int)wg.Add(1)dataProducer(ch, &wg)wg.Add(1)dataComsumer(ch, &wg)wg.Wait() // 阻塞,直到waitgroup执行完毕,wg的值变为0

输出如下:

=== RUN   TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
--- PASS: TestProducer (0.00s)

上面代码中消费者协程在channel buffer 内部没有数据的时候只能够被动阻塞等待,直到channel中数据有效。这个实现导致生产者消费者之间的代码耦合度比较高,且当程序中存在多个producer和多个receiver的时候,receivers并不一定能够确切得知道什么时候producer才不会生产数据。

还是如上代码,我们如果启动多个消费者就能够很明显得看到问题,如下测试代码:

func TestProducer(t *testing.T) { var wg sync.WaitGroupch := make(chan int)wg.Add(1)dataProducer(ch, &wg)wg.Add(1)dataComsumer(ch, &wg)wg.Add(1)dataComsumer(ch, &wg)wg.Wait()
}
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
consumer data 0
consumer data 0
consumer data 4
consumer data 0
consumer data 0
consumer data 0
consumer data 0
consumer data 0

执行的过程中可以发现消费者消费了0,因为这个时候生产者已经不再生产数据了,再去消费的话会取到channel默认的值即0,且channel没有关闭,消费者还在等待有效的数据,还会一直阻塞程序运行。

所以channel 也提供了主动关闭的机制,即当生产者不再发送数据的时候可以主动关闭channel,而消费者再次使用channel的时候只需要确认一下channel的状态即可。如果channel为不可用,即可返回。

关于 关闭的channel 需要注意如下几点:

  • 向关闭的channel 发送数据会导致panic异常
  • v,ok <- ch; 接受channel的值和状态,如果ok为true,则表示channel可以接受数据;如果ok 为false,表示channel已经关闭,无法接受数据
  • 所有channel的接受者在channel关闭的时候都会从阻塞等待中返回上述OK值为false。这个广播机制可以被用作向多个订阅者发送信号。

修改测试代码如下:

package close_channelimport ("fmt""sync""testing"
)func dataProducer(ch chan int, wg *sync.WaitGroup) { go func() { for i := 0; i < 10; i ++ { ch <- i}close(ch)//ch <- 11 // 向关闭后的channel发送数据会报panic错误wg.Done()}()
}func dataComsumer(ch chan int, wg *sync.WaitGroup) { go func() { for { if data,ok := <-ch; ok { // 接受关闭后channel的广播,保证channel的输出结果是有效的fmt.Printf("consumer data %d
",data)}else { break}}wg.Done()}()
}func TestProducer(t *testing.T) { var wg sync.WaitGroupch := make(chan int)wg.Add(1)dataProducer(ch, &wg)wg.Add(1)dataComsumer(ch, &wg) // 启动多个消费者wg.Add(1)dataComsumer(ch, &wg)wg.Wait()
}

输出如下:

=== RUN   TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 9
consumer data 8
--- PASS: TestProducer (0.00s)

可能部分数据的输出顺序和单个消费者的数据输出顺序有差异,因为消费者也是各自的独立协程,所以在获取数据并输出的顺序会有差异。

更多相关: