channel 是一个数据管道,是 goroutine 之间数据通信桥梁,是线程安全的。channel分为有缓冲和无缓冲两种类型,其实无缓冲类型可以理解为有缓冲的一种特殊情况。
1 channel工作原理
源码 go/src/runtime/chan.go
type hchan struct {
qcount uint // 当前队列中剩余元素个数
dataqsiz uint // 环形队列长度,即缓冲区的大小,即make(chan T,N),N.
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的大小
closed uint32 // 表示当前通道是否处于关闭状态。创建通道后,该字段设置为0,即通道打开; 通过调用close将其设置为1,通道关闭。
elemtype *_type // 元素类型,用于数据传递过程中的赋值;
sendx uint // 环形缓冲区的状态字段,它指向环形队列当前发送索引
recvx uint // 环形缓冲区的状态字段,它指向环形队列当前接收索引
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
lock mutex // 互斥锁,为每个读写操作锁定通道,因为发送和接收必须是互斥操作
}
从结构体看核心字段是环形队列buf,而qcount、dataqsiz、sendx、recvx是维护buf状态字段,recvq和sendq是存放发送接收channel阻塞的goroutine队列,也是间接维护buf的,lock是整个结构体的锁,避免不同goroutine读写数据竞争,因此channel是并发安全的。
1.1 发送数据到channel
环形队列buf状态 | goroutine状态 |
---|---|
buf未满 | 有一个goroutine发送数据,不会阻塞,把发送的数据填充到环形队列buf空闲位置,按环形队列索引顺序填充数据 |
buf已满 | 有一个goroutine发送数据,环形队列buf没地方存放了,goroutine阻塞等待,把该goroutine的现场保存下来,存放到发送goroutine队列sendq ,等环形队列buf被消费后有空闲的位置,从sendq 队列(先进先出)唤醒goroutine恢复现场,把发送的数据填充到环形队列空闲位置 |
发送流程图:
1.2 从channel接收数据
环形队列buf状态 | 当前goroutine状态 |
---|---|
buf有数据 | 有一个goroutine接收数据,不会阻塞,按环形队列索引顺序消费数据,消费一个数据,环形队列buf就空出一个位置 |
buf为空 | 有一个goroutine接收数据,goroutine阻塞等待,把该goroutine的现场保存下来,存放到发送goroutine队列recvq ,等环形队列buf有新的数据后,从recvq 队列(先进先出)唤醒goroutine恢复现场,消费buf的数据 |
接收流程图:
1.3 goroutine挂起与唤醒
goroutine挂起时调用gopark函数,唤醒调用goready函数,一般是成对出现。
- channel发送挂起,⼀定是由channel接收端(或close)唤醒。
- channel接收挂起,⼀定是由channel发送(或close)唤醒。
- 当主动closechannel时,同时唤醒channel发送和接收队列的所有goroutine。
1.4 channel操作方式
操作 channel 一般有如下三种方式:
- 读 <-ch
- 写 ch<-
- 关闭 close(ch)
操作 | nil的channel | 正常channel | 已关闭的channel |
---|---|---|---|
读 <-ch | 阻塞 | 成功或阻塞 | 读到零值 |
写 ch<- | 阻塞 | 成功或阻塞 | panic |
关闭 close(ch) | panic | 成功 | panic |
可以使用for range接收管道数据
for v := range ch { // 一直循环等待读取数据,直到关闭通道退出循环
fmt.Println(v)
}
可以使用select接收管道数据或发送数据到管道
var count int
for {
select { // 可以用select作为接发送或接收选择器
case v, ok := <-ch1: // 当关闭通道后,如果有数据也会读出来
if !ok { // 判断通道是否已经关闭
fmt.Println("通道已经关闭")
return
}
fmt.Println(v)
case ch2<-count: // 发送数据
}
count++
}
2 channel应用
2.1 信息交流
channel 的底层是一个循环队列,当队列的长度大于0的时候,可以在队列中缓存数据信息,向一个 goroutine存放数据,从一个 goroutine读取数据,就像水管的两头,这样就实现了goroutine之间的消息交流。
示例代码:
// InfoExchange 读取信息
func InfoExchange(ctx context.Context, in <-chan interface{}) {
for {
select {
case v, ok := <-in:
if !ok {
fmt.Println("\n信息传递结束")
return
}
fmt.Printf("%v ", v)
case <-ctx.Done():
return
}
}
}
测试代码:
func genInfo() <-chan interface{} {
in := make(chan interface{}, 5)
go func() {
defer close(in)
for i := 0; i < 10; i++ {
in <- i
time.Sleep(time.Millisecond * 500)
}
}()
return in
}
func TestInfoExchange(t *testing.T) {
in := genInfo()
delay := time.Second * 5
ctx, _ := context.WithTimeout(context.Background(), delay)
go InfoExchange(ctx, in)
<-time.After(delay)
}
/*
结果:
0 1 2 3 4 5 6 7 8 9
信息传递结束
*/
2.2 数据传递
数据传递类似游戏“击鼓传花”。鼓响时,花(或者其它物件)从一个人手里传到下一个人,数据就类似这里的花
示例代码:
// DataTransfer 数据传递,从chan读取数据,并传递给下一个chan
func DataTransfer(id int, n int, chans []chan interface{}) {
for {
token := <-chans[id]
fmt.Printf("id=%d, v=%v \n", id, token)
chans[(id+1)%n] <- token
time.Sleep(time.Second)
}
}
测试代码:
func TestStartTask(t *testing.T) {
n := 4
chans := []chan interface{}{}
for i := 0; i < n; i++ {
chans = append(chans, make(chan interface{}, 1))
}
for i := 0; i < n; i++ {
go DataTransfer(i, n, chans)
}
// 初始化数据
chans[0] <- "a"
chans[1] <- "b"
chans[2] <- "c"
chans[3] <- "d"
<-time.After(time.Second * 5)
}
/*
结果:
id=3, v=d
id=0, v=a
id=1, v=b
id=2, v=c
id=1, v=a
id=0, v=d
id=3, v=c
id=2, v=b
...
*/
2.3 信号通知
使用非缓冲channel的特性,当channel没有数据接收时会阻塞,直到有新的数据进来或者 channel 被关闭才会退出阻塞,因此可以作为信号通知。
示例代码:
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)
<-quit
2.4 锁
示例代码:
// Mutex 使用chan实现互斥锁
type Mutex struct {
ch chan struct{}
}
// NewMutex 使用锁需要初始化
func NewMutex() *Mutex {
mu := &Mutex{make(chan struct{}, 1)}
mu.ch <- struct{}{}
return mu
}
// Lock 请求锁,直到获取到
func (m *Mutex) Lock() {
<-m.ch
}
// Unlock 解锁
func (m *Mutex) Unlock() {
select {
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}
// TryLock 尝试获取锁
func (m *Mutex) TryLock() bool {
select {
case <-m.ch:
return true
default:
}
return false
}
// LockTimeout 加入一个超时的设置
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
timer := time.NewTimer(timeout)
select {
case <-m.ch:
timer.Stop()
return true
case <-timer.C:
}
return false
}
// IsLocked 锁是否已被持有
func (m *Mutex) IsLocked() bool {
return len(m.ch) == 0
}
测试代码:
func TestMutex_TryLock(t *testing.T) {
m := NewMutex()
for i := 0; i < 5; i++ {
go func(i int) {
if m.TryLock() {
fmt.Printf("NO %d get lock success\n", i)
} else {
fmt.Printf("NO %d get lock failed\n", i)
}
}(i)
}
time.Sleep(time.Millisecond)
}
/*
结果:
NO 4 get lock success
NO 2 get lock failed
NO 3 get lock failed
NO 0 get lock failed
NO 1 get lock failed
*/
func TestMutex_Lock(t *testing.T) {
a := -1
m := NewMutex()
for i := 0; i < 5; i++ {
go func(i int) {
for {
m.Lock()
a = i
time.Sleep(time.Millisecond * 100)
fmt.Println("a =", a)
m.Unlock()
}
}(i)
}
time.Sleep(time.Second)
}
/*
结果:
a = 4
a = 2
a = 3
a = 0
a = 1
...
*/
2.5 任务编排
2.5.1 or-Done 模式
有n 个任务,其中任意一个完成就算完成,这叫or-Done 模式,
使用场景:用户查询请求,同时发两次给集群服务,取最快返回,使用冗余请求增加体验。
示例代码:
// OrDone 任意一个channel完成就退出
func OrDone(channels ...<-chan interface{}) <-chan interface{} { // <1>
switch len(channels) {
case 0: // <2>
return nil
case 1: // <3>
return channels[0]
}
orDone := make(chan interface{})
go func() { // <4>
defer close(orDone)
switch len(channels) {
case 2: // <5>
select {
case <-channels[0]:
case <-channels[1]:
}
default: // <6>
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-OrDone(append(channels[3:], orDone)...): // <6>
}
}
}()
return orDone
}
测试:
func done(after time.Duration) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
fmt.Println("delay:", after)
time.Sleep(after)
}()
return c
}
// 随机1~10秒
func randTime() time.Duration {
n := time.Duration(rand.Int31n(10))
return n * time.Second
}
func TestOrDone(t *testing.T) {
<-OrDone(
done(randTime()),
done(randTime()),
done(randTime()),
done(randTime()),
done(randTime()),
done(randTime()),
)
}
/*
结果:
delay: 1s
delay: 1s
delay: 8s
delay: 7s
delay: 7s
delay: 9s
*/
2.5.2 扇入模式
多个结果组合到一个channel中的过程叫扇入模式下,输入源有多个,输出目标只有一个。
示例代码:
// FanIn 扇入,多个channel组合到一个channel
func FanIn(chans ...<-chan interface{}) <-chan interface{} {
switch len(chans) {
case 0:
c := make(chan interface{})
close(c)
return c
case 1:
return chans[0]
case 2:
return mergeTwo(chans[0], chans[1])
default:
m := len(chans) / 2
return mergeTwo( // 对多个数据进行合并处理
FanIn(chans[:m]...),
FanIn(chans[m:]...))
}
}
func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
for a != nil || b != nil { //只要还有可读的chan
select {
case v, ok := <-a:
if !ok { // a 已关闭,设置为nil
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok { // b 已关闭,设置为nil
b = nil
continue
}
c <- v
}
}
}()
return c
}
测试:
func done(v int) <-chan interface{} {
in := make(chan interface{})
go func() {
defer close(in)
in <- v
time.Sleep(time.Millisecond * 500)
}()
return in
}
func TestFanIn(t *testing.T) {
out := FanIn(
done(1),
done(2),
done(3),
)
for v := range out {
fmt.Println(v)
}
}
/*
结果:
1
3
2
*/
2.5.3 扇出模式
扇出模式(Fan-Out)只有一个输入源,但是有多个输出目标。从源 channel 取出一个数据后,依次发送给多个目标 channel。发送的时候,既可以同步,也可以异步。
示例代码:
// FanOut 扇出,只有一个输入源,但是有多个输出目标
func FanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
go func() {
defer func() { //退出时关闭所有的输出chan
for i := 0; i < len(out); i++ {
close(out[i])
}
}()
for v := range ch { // 从输入chan中读取数据
v := v
for i := 0; i < len(out); i++ {
i := i
if async { //异步
go func() {
out[i] <- v // 放入到输出chan中,异步方式
}()
} else {
out[i] <- v // 放入到输出chan中,同步方式
}
}
}
}()
}
测试:
func TestFanOut(t *testing.T) {
ch := make(chan interface{})
chLister := []chan interface{}{make(chan interface{}), make(chan interface{}), make(chan interface{})}
FanOut(ch, chLister, false)
for i := 0; i < 5; i++ {
ch <- i
fmt.Println(<-chLister[0], <-chLister[1], <-chLister[2])
time.Sleep(time.Second)
}
}
/*
结果:
0 0 0
1 1 1
2 2 2
3 3 3
4 4 4
*/
2.5.4 stream
stream 是把 channel 当做流式管道的方式。
// AsStream 将一个 slice 转成流
func AsStream(done <-chan struct{}, values ...interface{}) <-chan interface{} {
s := make(chan interface{}) //创建一个unbuffered的channel
go func() { // 启动一个goroutine,往s中塞数据
defer close(s) // 退出时关闭chan
for _, v := range values { // 遍历数组
select {
case <-done:
return
case s <- v: // 将数组元素塞入到chan中
}
}
}()
return s
}
// TakeN 获取流的前n个数据
func TakeN(done <-chan struct{}, valueStream <-chan interface{}, n int) <-chan interface{} {
takeStream := make(chan interface{}) // 创建输出流
go func() {
defer close(takeStream)
for i := 0; i < n; i++ { // 只读取前num个元素
select {
case <-done:
return
case takeStream <- <-valueStream: //从输入流中读取元素
}
}
}()
return takeStream
}
2.5.5 map-reduce
map-reduce 是一种面向大规模数据处理的并行计算模型和方法,但是这里要介绍的是一种单机版的 map-reduce 模式。
map-reduce 分为两个步骤,第一步是 map,将队列中的数据用 mapFn 函数处理;第二步是 reduce,将处理后的数据用 reduceFn 函数汇总。
// MapChan 处理mapFn处理数据
func MapChan(in <-chan interface{}, mapFn func(interface{}) interface{}) <-chan interface{} {
out := make(chan interface{}) // 创建一个输出chan
if in == nil { // 异常检查
close(out)
return out
}
go func() { // 启动一个goroutine,实现map的主要逻辑
defer close(out)
for v := range in { // 从输入chan读取数据,执行业务操作,也就是map操作
out <- mapFn(v)
}
}()
return out
}
// Reduce reduceFn函数汇总
func Reduce(in <-chan interface{}, reduceFn func(r, v interface{}) interface{}) interface{} {
if in == nil { // 异常检查
return nil
}
out := <-in // 先读取第一个元素
for v := range in { // 实现reduce的主要逻辑
out = reduceFn(out, v)
}
return out
}
测试:
// 需求:将一组数据中每个数据乘以10,最后计算总和。为此,我们需要实现 mapFn (乘 10) 和 reduceFn (求和)
// 生成一个数据流
func numStream(done <-chan struct{}) <-chan interface{} {
s := make(chan interface{})
values := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
go func() {
defer close(s)
for _, v := range values { // 从数组生成
select {
case <-done:
return
case s <- v:
}
}
}()
return s
}
func TestMapReduce(t *testing.T) {
in := numStream(nil)
// map操作: 乘以10
mapFn := func(v interface{}) interface{} {
return v.(int) * 10
}
// reduce操作: 对map的结果进行累加
reduceFn := func(r, v interface{}) interface{} {
return r.(int) + v.(int)
}
sum := Reduce(MapChan(in, mapFn), reduceFn) //返回累加结果
fmt.Println(sum)
}
2.6 worker模式
2.6.1 最简单的worker处理队列方式
package main
import "time"
type Job int
func worker(jobChan <-chan Job) {
for job := range jobChan {
// 顺序执行,缺点:如果处理过程中有等待或阻塞,会影响整个队列
Process(job)
// 并发执行,如果处理过程中有等待或阻塞,不会影响其他的job,
// 缺点:并发处理job的goroutine数量不可控,每来一个新job就会启动一个goroutine,不建议这样处理。
// 通常做法是开启有限个worker的goroutine来并行处理队列的job,而不是在process里并发执行。
//go Process(job)
}
}
func Process(job Job) {
if job == 3 { // 等于3的这个job阻塞1s
time.Sleep(time.Second)
}
println("job", job)
}
func main() {
// make a channel with a capacity of 10.
jobChan := make(chan Job, 10)
// start the worker
go worker(jobChan)
// enqueue a job
for i := 0; i < 20; i++ {
jobChan <- Job(i)
}
time.Sleep(2 * time.Second)
}
/* 当次执行的结果如下:
(1) 顺序执行,在处理job 3时阻塞了一秒,其他job要等job 3处理完毕后再处理往下执行。
job 0
job 1
job 2
job 3
job 4
job 5
job 6
job 7
job 8
job 9
job 10
job 11
job 12
job 13
job 14
job 15
job 16
job 17
job 18
job 19
------------------------------------------------------
(2) 并发process执行,在处理job 3时阻塞了一秒,不影响其他的job处理。
job 2
job 10
job 0
job 6
job 4
job 5
job 9
job 7
job 1
job 8
job 13
job 11
job 12
job 19
job 17
job 18
job 14
job 15
job 16
job 3
*/
2.6.2 使用worker池处理队列
package main
import (
"fmt"
"time"
)
type Job int
func worker(i int, jobChan <-chan Job) {
for job := range jobChan {
Process(i, job)
}
}
func Process(i int, job Job) {
if job == 3 {
time.Sleep(time.Second)
}
fmt.Printf("worker %2d process job %d\n", i, job)
}
func workPool(workerSize int, jobChan chan Job) {
for i := 0; i < workerSize; i++ {
go func(i int) {
worker(i, jobChan)
}(i)
}
}
func main() {
jobChan := make(chan Job, 10)
// 启动多个worker池并发处理队列的job,多个worker去抢队列job来处理,只有空闲的worker才能从队列中获取job
workPool(5, jobChan)
for i := 0; i < 20; i++ {
jobChan <- Job(i)
}
time.Sleep(2 * time.Second)
}
/*当次执行结果如下:
可以看到worker 2处理job 3时阻塞一秒,worker 2在阻塞过程中没有去抢占队列新的job来处理。
worker 4 process job 0
worker 3 process job 4
worker 0 process job 2
worker 3 process job 6
worker 3 process job 8
worker 3 process job 9
worker 3 process job 10
worker 3 process job 11
worker 3 process job 12
worker 3 process job 13
worker 3 process job 14
worker 3 process job 15
worker 3 process job 16
worker 3 process job 17
worker 3 process job 18
worker 1 process job 1
worker 4 process job 5
worker 0 process job 7
worker 3 process job 19
worker 2 process job 3
*/
2.6.3 等待worker处理所有队列的job
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job int
func worker(n int, jobChan <-chan Job, wg *sync.WaitGroup) {
for job := range jobChan {
Process(n, job, wg)
}
}
func Process(n int, job Job, wg *sync.WaitGroup) {
defer wg.Done()
// 加上随机延时,方便查看打印效果
size := randTime()
time.Sleep(size * time.Millisecond)
fmt.Printf("worker %2d process job %d, time %dms\n", n, job, size)
}
// 随机时间100~500
func randTime() time.Duration {
rand.Seed(time.Now().UnixNano())
return time.Duration(rand.Intn(400) + 100)
}
func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
ch := make(chan struct{})
go func() {
wg.Wait()
close(ch)
}()
select {
case <-ch:
return true
case <-time.After(timeout):
return false
}
}
func workPool(workerNum int, jobChan chan Job, wg *sync.WaitGroup) {
for i := 0; i < workerNum; i++ {
go func(i int) {
worker(i, jobChan, wg)
}(i)
}
}
func main() {
wg := &sync.WaitGroup{}
jobChan := make(chan Job, 10)
workPool(5, jobChan, wg)
for i := 0; i < 20; i++ {
wg.Add(1)
jobChan <- Job(i)
}
t := time.Now()
// 等待所有job处理完毕才退出,不足:不管是顺序执行还是并发process方式处理job,如果其中有一个job阻塞了,会一直等待下去
wg.Wait()
// 带超时等待,如果超时了,直接忽略等待
//ok := WaitTimeout(wg, 300*time.Millisecond)
//if !ok {
// fmt.Printf("\n warning, process job timeout \n")
//}
fmt.Printf("\n handle queue time %v\n", time.Now().Sub(t))
}
/* 当次执行结果如下:
(1) 不带超时的wait
worker 2 process job 2, time 147ms
worker 0 process job 0, time 147ms
worker 1 process job 1, time 147ms
worker 3 process job 4, time 147ms
worker 4 process job 3, time 147ms
worker 1 process job 7, time 282ms
worker 4 process job 9, time 282ms
worker 0 process job 6, time 282ms
worker 3 process job 8, time 282ms
worker 2 process job 5, time 399ms
worker 1 process job 10, time 149ms
worker 0 process job 12, time 149ms
worker 3 process job 13, time 149ms
worker 4 process job 11, time 149ms
worker 2 process job 14, time 145ms
worker 0 process job 16, time 265ms
worker 1 process job 15, time 265ms
worker 3 process job 17, time 265ms
worker 4 process job 18, time 265ms
worker 2 process job 19, time 234ms
handle queue time 786.8609ms
------------------------------------------------
(2) 有超时的wait
worker 3 process job 3, time 257ms
worker 1 process job 1, time 272ms
worker 0 process job 0, time 272ms
worker 4 process job 4, time 272ms
worker 2 process job 2, time 272ms
worker 4 process job 8, time 102ms
worker 3 process job 5, time 224ms
warning, process job timeout
handle queue time 300.3939ms
*/
2.6.4 使用context或channel停止worker
有两种方式停止处理队列中的worker,分别是context和channel,多级函数传递或复杂点的控制建议使用context。
(1) 使用context停止worker
package main
import (
"fmt"
"math/rand"
"time"
"golang.org/x/net/context"
)
type Job int
func workPool(workerNum int, jobChan chan Job, ctx context.Context) {
for i := 0; i < workerNum; i++ {
go func(i int) {
worker(i, jobChan, ctx)
}(i)
}
}
// 通过context取消未完成的job
func worker(n int, jobChan <-chan Job, ctx context.Context) {
for {
select {
case job := <-jobChan:
Process(n, job)
case <-ctx.Done():
fmt.Printf("cancel worker %d\n", n)
return
}
}
}
func Process(n int, job Job) {
size := randTime()
time.Sleep(size * time.Millisecond)
fmt.Printf("worker %2d process job %2d, time %dms\n", n, job, size)
}
// 随机时间100~500
func randTime() time.Duration {
rand.Seed(time.Now().UnixNano())
return time.Duration(rand.Intn(400) + 100)
}
func main() {
jobChan := make(chan Job, 10)
// 使用context控制worker是否停止,适合多级函数传递和控制,并且有超时取消
ctx, cancel := context.WithCancel(context.Background())
//ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 可以设置延时的context
workPool(5, jobChan, ctx)
for i := 0; i < 20; i++ {
jobChan <- Job(i)
}
cancel()
time.Sleep(5 * time.Second)
}
/* 当次执行结果如下:
worker 2 process job 3, time 471ms
worker 3 process job 4, time 471ms
worker 1 process job 1, time 471ms
worker 4 process job 0, time 471ms
worker 0 process job 2, time 471ms
worker 3 process job 6, time 200ms
cancel worker 3
worker 4 process job 8, time 200ms
worker 2 process job 5, time 200ms
cancel worker 2
worker 0 process job 9, time 200ms
cancel worker 0 by context
worker 1 process job 7, time 200ms
cancel worker 1
worker 4 process job 10, time 310ms
cancel worker 4
*/
(2)使用channel停止worker
package main
import (
"fmt"
"math/rand"
"time"
)
type Job int
func workPool(workerNum int, jobChan chan Job, ch chan struct{}) {
for i := 0; i < workerNum; i++ {
go func(i int) {
worker(i, jobChan, ch)
}(i)
}
}
// 通过channel取消未完成的job
func worker(n int, jobChan <-chan Job, ch chan struct{}) {
for {
select {
case job := <-jobChan:
Process(n, job)
case <-ch:
fmt.Printf("cancel worker %d\n", n)
return
}
}
}
func Process(n int, job Job) {
size := randTime()
time.Sleep(size * time.Millisecond)
fmt.Printf("worker %2d process job %2d, time %dms\n", n, job, size)
}
// 随机时间100~500
func randTime() time.Duration {
rand.Seed(time.Now().UnixNano())
return time.Duration(rand.Intn(400) + 100)
}
func main() {
jobChan := make(chan Job, 10)
// 使用channel控制worker是否停止
ch := make(chan struct{})
workPool(5, jobChan, ch)
for i := 0; i < 20; i++ {
jobChan <- Job(i)
}
close(ch)
time.Sleep(5 * time.Second)
}
/* 当次执行结果如下:
worker 1 process job 1, time 313ms
worker 3 process job 4, time 313ms
worker 2 process job 3, time 313ms
worker 0 process job 2, time 313ms
worker 4 process job 0, time 313ms
worker 3 process job 6, time 258ms
cancel worker 3
worker 2 process job 7, time 258ms
worker 4 process job 9, time 258ms
cancel worker 4
worker 0 process job 8, time 258ms
worker 1 process job 5, time 258ms
cancel worker 1
worker 0 process job 11, time 190ms
cancel worker 0
worker 2 process job 10, time 226ms
worker 2 process job 12, time 485ms
cancel worker 2
*/
2.7 关闭事件跟踪器
package main
import (
"context"
"fmt"
"time"
)
// Tracker 跟踪器
type Tracker struct {
ch chan string
stop chan struct{}
}
// NewTracker 实例化 func NewTracker() *Tracker {
return &Tracker{
ch: make(chan string, 20),
stop: make(chan struct{}),
}
}
// Event 触发事件
func (t *Tracker) Event(ctx context.Context, data string) error {
select {
case t.ch <- data:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Run 执行
func (t *Tracker) Run() {
for data := range t.ch {
fmt.Println(data)
time.Sleep(time.Second * 10)
}
t.stop <- struct{}{}
}
// Shutdown 关闭
func (t *Tracker) Shutdown(ctx context.Context) {
close(t.ch)
select {
case <-t.stop:
fmt.Println("正常结束")
case <-ctx.Done():
fmt.Println("超时结束")
}
}
func main() {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*2))
defer cancel()
tr := NewTracker()
go tr.Run()
tr.Event(ctx, "data1")
tr.Event(ctx, "data2")
tr.Event(ctx, "data3")
tr.Shutdown(ctx)
/*
执行结果:
data1
超时结束
*/
}
3 注意事项
未初始化的channel,读取里面的数据时,会造成死锁deadlock
var ch chan int
<-ch // 未初始化channel读数据会死锁`
未初始化的channel,往里面写数据时,会造成死锁deadlock
var ch chan int
ch<- // 未初始化channel写数据会死锁
未初始化的channel,关闭该channel时,会panic
var ch chan int
close(ch) // 关闭未初始化channel,触发panic
向已关闭的channel写数据,会pannic
var ch =make(chan int)
close(ch)
ch<-1 // channel已关闭,触发panic
专题「golang相关」的其它文章 »
- 使用开发框架sponge快速把单体web服务拆分为微服务 (Sep 18, 2023)
- 使用开发框架sponge一天多开发完成一个简单版社区后端服务 (Jul 30, 2023)
- sponge —— 一个强大的go开发框架,以 (Jan 06, 2023)
- go test命令 (Apr 15, 2022)
- go应用程序性能分析 (Mar 29, 2022)
- go runtime (Mar 14, 2022)
- go调试工具 (Mar 13, 2022)
- cobra (Mar 10, 2022)
- grpc使用实践 (Nov 27, 2020)
- 配置文件viper库 (Nov 22, 2020)
- 根据服务名称查看golang程序的profile信息 (Sep 03, 2019)
- go语言开发规范 (Aug 28, 2019)
- goroutine和channel应用——处理队列 (Sep 06, 2018)
- golang中的context包 (Aug 28, 2018)