channel 是一个数据管道,是 goroutine 之间数据通信桥梁,是线程安全的。channel分为有缓冲和无缓冲两种类型,其实无缓冲类型可以理解为有缓冲的一种特殊情况。
1 channel工作原理
源码 go/src/runtime/chan.go
type hchan struct {
    qcount   uint           // 当前队列中剩余元素个数  
    dataqsiz uint           // 环形队列长度,即缓冲区的大小,即make(chan T,N),N.
    buf      unsafe.Pointer // 环形队列指针
    elemsize uint16         // 每个元素的大小 
    closed   uint32         // 表示当前通道是否处于关闭状态。创建通道后,该字段设置为0,即通道打开; 通过调用close将其设置为1,通道关闭。 
    elemtype *_type         // 元素类型,用于数据传递过程中的赋值; 
    sendx    uint           // 环形缓冲区的状态字段,它指向环形队列当前发送索引
    recvx    uint           // 环形缓冲区的状态字段,它指向环形队列当前接收索引
    recvq    waitq          // 等待读消息的goroutine队列 
    sendq    waitq          // 等待写消息的goroutine队列 
    lock     mutex          // 互斥锁,为每个读写操作锁定通道,因为发送和接收必须是互斥操作
}
从结构体看核心字段是环形队列buf,而qcount、dataqsiz、sendx、recvx是维护buf状态字段,recvq和sendq是存放发送接收channel阻塞的goroutine队列,也是间接维护buf的,lock是整个结构体的锁,避免不同goroutine读写数据竞争,因此channel是并发安全的。
1.1 发送数据到channel
| 环形队列buf状态 | goroutine状态 | 
|---|---|
| buf未满 | 有一个goroutine发送数据,不会阻塞,把发送的数据填充到环形队列buf空闲位置,按环形队列索引顺序填充数据 | 
| buf已满 | 有一个goroutine发送数据,环形队列buf没地方存放了,goroutine阻塞等待,把该goroutine的现场保存下来,存放到发送goroutine队列sendq,等环形队列buf被消费后有空闲的位置,从sendq队列(先进先出)唤醒goroutine恢复现场,把发送的数据填充到环形队列空闲位置 | 


发送流程图:

1.2 从channel接收数据
| 环形队列buf状态 | 当前goroutine状态 | 
|---|---|
| buf有数据 | 有一个goroutine接收数据,不会阻塞,按环形队列索引顺序消费数据,消费一个数据,环形队列buf就空出一个位置 | 
| buf为空 | 有一个goroutine接收数据,goroutine阻塞等待,把该goroutine的现场保存下来,存放到发送goroutine队列recvq,等环形队列buf有新的数据后,从recvq队列(先进先出)唤醒goroutine恢复现场,消费buf的数据 | 


接收流程图:

1.3 goroutine挂起与唤醒
goroutine挂起时调用gopark函数,唤醒调用goready函数,一般是成对出现。
- channel发送挂起,⼀定是由channel接收端(或close)唤醒。
 - channel接收挂起,⼀定是由channel发送(或close)唤醒。
 - 当主动closechannel时,同时唤醒channel发送和接收队列的所有goroutine。
 
1.4 channel操作方式
操作 channel 一般有如下三种方式:
- 读 <-ch
 - 写 ch<-
 - 关闭 close(ch)
 
| 操作 | nil的channel | 正常channel | 已关闭的channel | 
|---|---|---|---|
| 读 <-ch | 阻塞 | 成功或阻塞 | 读到零值 | 
| 写 ch<- | 阻塞 | 成功或阻塞 | panic | 
| 关闭 close(ch) | panic | 成功 | panic | 
可以使用for range接收管道数据
for v := range ch { // 一直循环等待读取数据,直到关闭通道退出循环
    fmt.Println(v)
}
可以使用select接收管道数据或发送数据到管道
var count int
for {
    select { // 可以用select作为接发送或接收选择器
    case v, ok := <-ch1: // 当关闭通道后,如果有数据也会读出来
        if !ok { // 判断通道是否已经关闭
            fmt.Println("通道已经关闭")
            return
        }
        fmt.Println(v)
    case ch2<-count: // 发送数据
    }
    count++
}
2 channel应用
2.1 信息交流
channel 的底层是一个循环队列,当队列的长度大于0的时候,可以在队列中缓存数据信息,向一个 goroutine存放数据,从一个 goroutine读取数据,就像水管的两头,这样就实现了goroutine之间的消息交流。
示例代码:
// InfoExchange 读取信息
func InfoExchange(ctx context.Context, in <-chan interface{}) {
    for {
        select {
        case v, ok := <-in:
            if !ok {
                fmt.Println("\n信息传递结束")
                return
            }
            fmt.Printf("%v ", v)
        case <-ctx.Done():
            return
        }
    }
}
测试代码:
func genInfo() <-chan interface{} {
    in := make(chan interface{}, 5)
    go func() {
        defer close(in)
        for i := 0; i < 10; i++ {
            in <- i
            time.Sleep(time.Millisecond * 500)
        }
    }()
    return in
}
func TestInfoExchange(t *testing.T) {
    in := genInfo()
    delay := time.Second * 5
    ctx, _ := context.WithTimeout(context.Background(), delay)
    go InfoExchange(ctx, in)
    <-time.After(delay)
}
/*
结果:
0 1 2 3 4 5 6 7 8 9 
信息传递结束
*/
2.2 数据传递
数据传递类似游戏“击鼓传花”。鼓响时,花(或者其它物件)从一个人手里传到下一个人,数据就类似这里的花
示例代码:
// DataTransfer 数据传递,从chan读取数据,并传递给下一个chan
func DataTransfer(id int, n int, chans []chan interface{}) {
    for {
        token := <-chans[id]
        fmt.Printf("id=%d, v=%v \n", id, token)
        chans[(id+1)%n] <- token
        time.Sleep(time.Second)
    }
}
测试代码:
func TestStartTask(t *testing.T) {
    n := 4
    chans := []chan interface{}{}
    for i := 0; i < n; i++ {
        chans = append(chans, make(chan interface{}, 1))
    }
    for i := 0; i < n; i++ {
        go DataTransfer(i, n, chans)
    }
    // 初始化数据
    chans[0] <- "a"
    chans[1] <- "b"
    chans[2] <- "c"
    chans[3] <- "d"
    <-time.After(time.Second * 5)
}
/*
结果:
id=3, v=d 
id=0, v=a 
id=1, v=b 
id=2, v=c 
id=1, v=a 
id=0, v=d 
id=3, v=c 
id=2, v=b 
...
*/
2.3 信号通知
使用非缓冲channel的特性,当channel没有数据接收时会阻塞,直到有新的数据进来或者 channel 被关闭才会退出阻塞,因此可以作为信号通知。
示例代码:
quit := make(chan os.Signal)  
signal.Notify(quit, os.Interrupt)  
<-quit
2.4 锁
示例代码:
// Mutex 使用chan实现互斥锁
type Mutex struct {
    ch chan struct{}
}
// NewMutex 使用锁需要初始化
func NewMutex() *Mutex {
    mu := &Mutex{make(chan struct{}, 1)}
    mu.ch <- struct{}{}
    return mu
}
// Lock 请求锁,直到获取到
func (m *Mutex) Lock() {
    <-m.ch
}
// Unlock 解锁
func (m *Mutex) Unlock() {
    select {
    case m.ch <- struct{}{}:
    default:
        panic("unlock of unlocked mutex")
    }
}
// TryLock 尝试获取锁
func (m *Mutex) TryLock() bool {
    select {
    case <-m.ch:
        return true
    default:
    }
    return false
}
// LockTimeout 加入一个超时的设置
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
    timer := time.NewTimer(timeout)
    select {
    case <-m.ch:
        timer.Stop()
        return true
    case <-timer.C:
    }
    return false
}
// IsLocked 锁是否已被持有
func (m *Mutex) IsLocked() bool {
    return len(m.ch) == 0
}
测试代码:
func TestMutex_TryLock(t *testing.T) {
    m := NewMutex()
    for i := 0; i < 5; i++ {
        go func(i int) {
            if m.TryLock() {
                fmt.Printf("NO %d get lock success\n", i)
            } else {
                fmt.Printf("NO %d get lock failed\n", i)
            }
        }(i)
    }
    time.Sleep(time.Millisecond)
}
/*
结果:
NO 4 get lock success
NO 2 get lock failed
NO 3 get lock failed
NO 0 get lock failed
NO 1 get lock failed
*/
func TestMutex_Lock(t *testing.T) {
    a := -1
    m := NewMutex()
    for i := 0; i < 5; i++ {
        go func(i int) {
            for {
                m.Lock()
                a = i
                time.Sleep(time.Millisecond * 100)
                fmt.Println("a =", a)
                m.Unlock()
            }
        }(i)
    }
    time.Sleep(time.Second)
}
/*
结果:
a = 4
a = 2
a = 3
a = 0
a = 1
...
*/
2.5 任务编排
2.5.1 or-Done 模式
有n 个任务,其中任意一个完成就算完成,这叫or-Done 模式,
使用场景:用户查询请求,同时发两次给集群服务,取最快返回,使用冗余请求增加体验。
示例代码:
// OrDone 任意一个channel完成就退出
func OrDone(channels ...<-chan interface{}) <-chan interface{} { // <1>
    switch len(channels) {
    case 0: // <2>
        return nil
    case 1: // <3>
        return channels[0]
    }
    orDone := make(chan interface{})
    go func() { // <4>
        defer close(orDone)
        switch len(channels) {
        case 2: // <5>
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default: // <6>
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-OrDone(append(channels[3:], orDone)...): // <6>
            }
        }
    }()
    return orDone
}
测试:
func done(after time.Duration) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        fmt.Println("delay:", after)
        time.Sleep(after)
    }()
    return c
}
// 随机1~10秒
func randTime() time.Duration {
    n := time.Duration(rand.Int31n(10))
    return n * time.Second
}
func TestOrDone(t *testing.T) {
    <-OrDone(
        done(randTime()),
        done(randTime()),
        done(randTime()),
        done(randTime()),
        done(randTime()),
        done(randTime()),
    )
}
/*
结果:
delay: 1s
delay: 1s
delay: 8s
delay: 7s
delay: 7s
delay: 9s
*/
2.5.2 扇入模式
多个结果组合到一个channel中的过程叫扇入模式下,输入源有多个,输出目标只有一个。
示例代码:
// FanIn 扇入,多个channel组合到一个channel
func FanIn(chans ...<-chan interface{}) <-chan interface{} {
    switch len(chans) {
    case 0:
        c := make(chan interface{})
        close(c)
        return c
    case 1:
        return chans[0]
    case 2:
        return mergeTwo(chans[0], chans[1])
    default:
        m := len(chans) / 2
        return mergeTwo( // 对多个数据进行合并处理
            FanIn(chans[:m]...),
            FanIn(chans[m:]...))
    }
}
func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        for a != nil || b != nil { //只要还有可读的chan
            select {
            case v, ok := <-a:
                if !ok { // a 已关闭,设置为nil
                    a = nil
                    continue
                }
                c <- v
            case v, ok := <-b:
                if !ok { // b 已关闭,设置为nil
                    b = nil
                    continue
                }
                c <- v
            }
        }
    }()
    return c
}
测试:
func done(v int) <-chan interface{} {
    in := make(chan interface{})
    go func() {
        defer close(in)
        in <- v
        time.Sleep(time.Millisecond * 500)
    }()
    return in
}
func TestFanIn(t *testing.T) {
    out := FanIn(
        done(1),
        done(2),
        done(3),
    )
    for v := range out {
        fmt.Println(v)
    }
}
/*
结果:
1
3
2
*/
2.5.3 扇出模式
扇出模式(Fan-Out)只有一个输入源,但是有多个输出目标。从源 channel 取出一个数据后,依次发送给多个目标 channel。发送的时候,既可以同步,也可以异步。
示例代码:
// FanOut 扇出,只有一个输入源,但是有多个输出目标
func FanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
    go func() {
        defer func() { //退出时关闭所有的输出chan
            for i := 0; i < len(out); i++ {
                close(out[i])
            }
        }()
        for v := range ch { // 从输入chan中读取数据
            v := v
            for i := 0; i < len(out); i++ {
                i := i
                if async { //异步
                    go func() {
                        out[i] <- v // 放入到输出chan中,异步方式
                    }()
                } else {
                    out[i] <- v // 放入到输出chan中,同步方式
                }
            }
        }
    }()
}
测试:
func TestFanOut(t *testing.T) {
    ch := make(chan interface{})
    chLister := []chan interface{}{make(chan interface{}), make(chan interface{}), make(chan interface{})}
    FanOut(ch, chLister, false)
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println(<-chLister[0], <-chLister[1], <-chLister[2])
        time.Sleep(time.Second)
    }
}
/*
结果:
0 0 0
1 1 1
2 2 2
3 3 3
4 4 4
*/
2.5.4 stream
stream 是把 channel 当做流式管道的方式。
// AsStream 将一个 slice 转成流
func AsStream(done <-chan struct{}, values ...interface{}) <-chan interface{} {
    s := make(chan interface{}) //创建一个unbuffered的channel
    go func() {                 // 启动一个goroutine,往s中塞数据
        defer close(s)             // 退出时关闭chan
        for _, v := range values { // 遍历数组
            select {
            case <-done:
                return
            case s <- v: // 将数组元素塞入到chan中
            }
        }
    }()
    return s
}
// TakeN 获取流的前n个数据
func TakeN(done <-chan struct{}, valueStream <-chan interface{}, n int) <-chan interface{} {
    takeStream := make(chan interface{}) // 创建输出流
    go func() {
        defer close(takeStream)
        for i := 0; i < n; i++ { // 只读取前num个元素
            select {
            case <-done:
                return
            case takeStream <- <-valueStream: //从输入流中读取元素
            }
        }
    }()
    return takeStream
}
2.5.5 map-reduce
map-reduce 是一种面向大规模数据处理的并行计算模型和方法,但是这里要介绍的是一种单机版的 map-reduce 模式。
map-reduce 分为两个步骤,第一步是 map,将队列中的数据用 mapFn 函数处理;第二步是 reduce,将处理后的数据用 reduceFn 函数汇总。
// MapChan 处理mapFn处理数据
func MapChan(in <-chan interface{}, mapFn func(interface{}) interface{}) <-chan interface{} {
    out := make(chan interface{}) // 创建一个输出chan
    if in == nil {                // 异常检查
        close(out)
        return out
    }
    go func() { // 启动一个goroutine,实现map的主要逻辑
        defer close(out)
        for v := range in { // 从输入chan读取数据,执行业务操作,也就是map操作
            out <- mapFn(v)
        }
    }()
    return out
}
// Reduce  reduceFn函数汇总
func Reduce(in <-chan interface{}, reduceFn func(r, v interface{}) interface{}) interface{} {
    if in == nil { // 异常检查
        return nil
    }
    out := <-in         // 先读取第一个元素
    for v := range in { // 实现reduce的主要逻辑
        out = reduceFn(out, v)
    }
    return out
}
测试:
// 需求:将一组数据中每个数据乘以10,最后计算总和。为此,我们需要实现 mapFn (乘 10) 和 reduceFn (求和)
// 生成一个数据流
func numStream(done <-chan struct{}) <-chan interface{} {
    s := make(chan interface{})
    values := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    go func() {
        defer close(s)
        for _, v := range values { // 从数组生成
            select {
            case <-done:
                return
            case s <- v:
            }
        }
    }()
    return s
}
func TestMapReduce(t *testing.T) {
    in := numStream(nil)
    // map操作: 乘以10
    mapFn := func(v interface{}) interface{} {
        return v.(int) * 10
    }
    // reduce操作: 对map的结果进行累加
    reduceFn := func(r, v interface{}) interface{} {
        return r.(int) + v.(int)
    }
    sum := Reduce(MapChan(in, mapFn), reduceFn) //返回累加结果
    fmt.Println(sum)
}
2.6 worker模式
2.6.1 最简单的worker处理队列方式
package main
import "time"
type Job int
func worker(jobChan <-chan Job) {
    for job := range jobChan {
        // 顺序执行,缺点:如果处理过程中有等待或阻塞,会影响整个队列
        Process(job)
        // 并发执行,如果处理过程中有等待或阻塞,不会影响其他的job,
        // 缺点:并发处理job的goroutine数量不可控,每来一个新job就会启动一个goroutine,不建议这样处理。
        // 通常做法是开启有限个worker的goroutine来并行处理队列的job,而不是在process里并发执行。
        //go Process(job)
    }
}
func Process(job Job) {
    if job == 3 { // 等于3的这个job阻塞1s
        time.Sleep(time.Second)
    }
    println("job", job)
}
func main() {
    // make a channel with a capacity of 10.
    jobChan := make(chan Job, 10)
    // start the worker
    go worker(jobChan)
    // enqueue a job
    for i := 0; i < 20; i++ {
        jobChan <- Job(i)
    }
    time.Sleep(2 * time.Second)
}
/* 当次执行的结果如下:
(1) 顺序执行,在处理job 3时阻塞了一秒,其他job要等job 3处理完毕后再处理往下执行。
job 0
job 1
job 2
job 3
job 4
job 5
job 6
job 7
job 8
job 9
job 10
job 11
job 12
job 13
job 14
job 15
job 16
job 17
job 18
job 19
------------------------------------------------------
(2) 并发process执行,在处理job 3时阻塞了一秒,不影响其他的job处理。
job 2
job 10
job 0
job 6
job 4
job 5
job 9
job 7
job 1
job 8
job 13
job 11
job 12
job 19
job 17
job 18
job 14
job 15
job 16
job 3
*/
2.6.2 使用worker池处理队列
package main
import (
    "fmt"
    "time"
)
type Job int
func worker(i int, jobChan <-chan Job) {
    for job := range jobChan {
        Process(i, job)
    }
}
func Process(i int, job Job) {
    if job == 3 {
        time.Sleep(time.Second)
    }
    fmt.Printf("worker %2d process job %d\n", i, job)
}
func workPool(workerSize int, jobChan chan Job) {
    for i := 0; i < workerSize; i++ {
        go func(i int) {
            worker(i, jobChan)
        }(i)
    }
}
func main() {
    jobChan := make(chan Job, 10)
    // 启动多个worker池并发处理队列的job,多个worker去抢队列job来处理,只有空闲的worker才能从队列中获取job
    workPool(5, jobChan)
    for i := 0; i < 20; i++ {
        jobChan <- Job(i)
    }
    time.Sleep(2 * time.Second)
}
/*当次执行结果如下:
可以看到worker 2处理job 3时阻塞一秒,worker 2在阻塞过程中没有去抢占队列新的job来处理。
worker  4 process job 0
worker  3 process job 4
worker  0 process job 2
worker  3 process job 6
worker  3 process job 8
worker  3 process job 9
worker  3 process job 10
worker  3 process job 11
worker  3 process job 12
worker  3 process job 13
worker  3 process job 14
worker  3 process job 15
worker  3 process job 16
worker  3 process job 17
worker  3 process job 18
worker  1 process job 1
worker  4 process job 5
worker  0 process job 7
worker  3 process job 19
worker  2 process job 3
*/
2.6.3 等待worker处理所有队列的job
package main
import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)
type Job int
func worker(n int, jobChan <-chan Job, wg *sync.WaitGroup) {
    for job := range jobChan {
        Process(n, job, wg)
    }
}
func Process(n int, job Job, wg *sync.WaitGroup) {
    defer wg.Done()
    // 加上随机延时,方便查看打印效果
    size := randTime()
    time.Sleep(size * time.Millisecond)
    fmt.Printf("worker %2d process job %d, time %dms\n", n, job, size)
}
// 随机时间100~500
func randTime() time.Duration {
    rand.Seed(time.Now().UnixNano())
    return time.Duration(rand.Intn(400) + 100)
}
func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
    ch := make(chan struct{})
    go func() {
        wg.Wait()
        close(ch)
    }()
    select {
    case <-ch:
        return true
    case <-time.After(timeout):
        return false
    }
}
func workPool(workerNum int, jobChan chan Job, wg *sync.WaitGroup) {
    for i := 0; i < workerNum; i++ {
        go func(i int) {
            worker(i, jobChan, wg)
        }(i)
    }
}
func main() {
    wg := &sync.WaitGroup{}
    jobChan := make(chan Job, 10)
    workPool(5, jobChan, wg)
    for i := 0; i < 20; i++ {
        wg.Add(1)
        jobChan <- Job(i)
    }
    t := time.Now()
    // 等待所有job处理完毕才退出,不足:不管是顺序执行还是并发process方式处理job,如果其中有一个job阻塞了,会一直等待下去
    wg.Wait()
    // 带超时等待,如果超时了,直接忽略等待
    //ok := WaitTimeout(wg, 300*time.Millisecond)
    //if !ok {
    //  fmt.Printf("\n warning, process job timeout \n")
    //}
    fmt.Printf("\n handle queue time %v\n", time.Now().Sub(t))
}
/* 当次执行结果如下:
(1) 不带超时的wait
worker  2 process job 2, time 147ms
worker  0 process job 0, time 147ms
worker  1 process job 1, time 147ms
worker  3 process job 4, time 147ms
worker  4 process job 3, time 147ms
worker  1 process job 7, time 282ms
worker  4 process job 9, time 282ms
worker  0 process job 6, time 282ms
worker  3 process job 8, time 282ms
worker  2 process job 5, time 399ms
worker  1 process job 10, time 149ms
worker  0 process job 12, time 149ms
worker  3 process job 13, time 149ms
worker  4 process job 11, time 149ms
worker  2 process job 14, time 145ms
worker  0 process job 16, time 265ms
worker  1 process job 15, time 265ms
worker  3 process job 17, time 265ms
worker  4 process job 18, time 265ms
worker  2 process job 19, time 234ms
 handle queue time 786.8609ms
------------------------------------------------
(2) 有超时的wait
worker  3 process job 3, time 257ms
worker  1 process job 1, time 272ms
worker  0 process job 0, time 272ms
worker  4 process job 4, time 272ms
worker  2 process job 2, time 272ms
worker  4 process job 8, time 102ms
worker  3 process job 5, time 224ms
 warning, process job timeout
 handle queue time 300.3939ms
*/
2.6.4 使用context或channel停止worker
有两种方式停止处理队列中的worker,分别是context和channel,多级函数传递或复杂点的控制建议使用context。
(1) 使用context停止worker
package main
import (
    "fmt"
    "math/rand"
    "time"
    "golang.org/x/net/context"
)
type Job int
func workPool(workerNum int, jobChan chan Job, ctx context.Context) {
    for i := 0; i < workerNum; i++ {
        go func(i int) {
            worker(i, jobChan, ctx)
        }(i)
    }
}
// 通过context取消未完成的job
func worker(n int, jobChan <-chan Job, ctx context.Context) {
    for {
        select {
        case job := <-jobChan:
            Process(n, job)
        case <-ctx.Done():
            fmt.Printf("cancel worker %d\n", n)
            return
        }
    }
}
func Process(n int, job Job) {
    size := randTime()
    time.Sleep(size * time.Millisecond)
    fmt.Printf("worker %2d process job %2d, time %dms\n", n, job, size)
}
// 随机时间100~500
func randTime() time.Duration {
    rand.Seed(time.Now().UnixNano())
    return time.Duration(rand.Intn(400) + 100)
}
func main() {
    jobChan := make(chan Job, 10)
    // 使用context控制worker是否停止,适合多级函数传递和控制,并且有超时取消
    ctx, cancel := context.WithCancel(context.Background())
    //ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 可以设置延时的context
    workPool(5, jobChan, ctx)
    for i := 0; i < 20; i++ {
        jobChan <- Job(i)
    }
    cancel()
    time.Sleep(5 * time.Second)
}
/* 当次执行结果如下:
worker  2 process job  3, time 471ms
worker  3 process job  4, time 471ms
worker  1 process job  1, time 471ms
worker  4 process job  0, time 471ms
worker  0 process job  2, time 471ms
worker  3 process job  6, time 200ms
cancel worker 3
worker  4 process job  8, time 200ms
worker  2 process job  5, time 200ms
cancel worker 2
worker  0 process job  9, time 200ms
cancel worker 0 by context
worker  1 process job  7, time 200ms
cancel worker 1
worker  4 process job 10, time 310ms
cancel worker 4
*/
(2)使用channel停止worker
package main
import (
    "fmt"
    "math/rand"
    "time"
)
type Job int
func workPool(workerNum int, jobChan chan Job, ch chan struct{}) {
    for i := 0; i < workerNum; i++ {
        go func(i int) {
            worker(i, jobChan, ch)
        }(i)
    }
}
// 通过channel取消未完成的job
func worker(n int, jobChan <-chan Job, ch chan struct{}) {
    for {
        select {
        case job := <-jobChan:
            Process(n, job)
        case <-ch:
            fmt.Printf("cancel worker %d\n", n)
            return
        }
    }
}
func Process(n int, job Job) {
    size := randTime()
    time.Sleep(size * time.Millisecond)
    fmt.Printf("worker %2d process job %2d, time %dms\n", n, job, size)
}
// 随机时间100~500
func randTime() time.Duration {
    rand.Seed(time.Now().UnixNano())
    return time.Duration(rand.Intn(400) + 100)
}
func main() {
    jobChan := make(chan Job, 10)
    // 使用channel控制worker是否停止
    ch := make(chan struct{})
    workPool(5, jobChan, ch)
    for i := 0; i < 20; i++ {
        jobChan <- Job(i)
    }
    close(ch)
    time.Sleep(5 * time.Second)
}
/* 当次执行结果如下:
worker  1 process job  1, time 313ms
worker  3 process job  4, time 313ms
worker  2 process job  3, time 313ms
worker  0 process job  2, time 313ms
worker  4 process job  0, time 313ms
worker  3 process job  6, time 258ms
cancel worker 3
worker  2 process job  7, time 258ms
worker  4 process job  9, time 258ms
cancel worker 4
worker  0 process job  8, time 258ms
worker  1 process job  5, time 258ms
cancel worker 1
worker  0 process job 11, time 190ms
cancel worker 0
worker  2 process job 10, time 226ms
worker  2 process job 12, time 485ms
cancel worker 2
*/
2.7 关闭事件跟踪器
package main
import (
    "context"
    "fmt"
    "time"
)
// Tracker 跟踪器  
type Tracker struct {  
   ch   chan string  
   stop chan struct{}  
}  
  
// NewTracker 实例化 func NewTracker() *Tracker {  
   return &Tracker{  
      ch:   make(chan string, 20),  
      stop: make(chan struct{}),  
   }  
}  
  
// Event 触发事件  
func (t *Tracker) Event(ctx context.Context, data string) error {  
   select {  
   case t.ch <- data:  
      return nil  
   case <-ctx.Done():  
      return ctx.Err()  
   }  
}  
  
// Run 执行  
func (t *Tracker) Run() {  
   for data := range t.ch {  
      fmt.Println(data)  
      time.Sleep(time.Second * 10)  
   }  
   t.stop <- struct{}{}  
}  
  
// Shutdown 关闭  
func (t *Tracker) Shutdown(ctx context.Context) {  
   close(t.ch)  
   select {  
   case <-t.stop:  
      fmt.Println("正常结束")  
   case <-ctx.Done():  
      fmt.Println("超时结束")  
   }  
}
func main() {  
   ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*2))  
   defer cancel() 
    
   tr := NewTracker()  
   go tr.Run()  
  
   tr.Event(ctx, "data1")  
   tr.Event(ctx, "data2")  
   tr.Event(ctx, "data3")  
  
   tr.Shutdown(ctx)
/*
执行结果:
data1
超时结束
*/
}
3 注意事项
未初始化的channel,读取里面的数据时,会造成死锁deadlock
var ch chan int
<-ch  // 未初始化channel读数据会死锁`
未初始化的channel,往里面写数据时,会造成死锁deadlock
var ch chan int
ch<-  // 未初始化channel写数据会死锁
未初始化的channel,关闭该channel时,会panic
var ch chan int
close(ch) // 关闭未初始化channel,触发panic
向已关闭的channel写数据,会pannic
var ch =make(chan int)
close(ch) 
ch<-1 // channel已关闭,触发panic
专题「golang相关」的其它文章 »
- DeepSeek与Sponge黄金组合打造后端高效开发新范式 (Feb 09, 2025)
 - 使用开发框架sponge快速把单体web服务拆分为微服务 (Sep 18, 2023)
 - 使用开发框架sponge一天多开发完成一个简单版社区后端服务 (Jul 30, 2023)
 - 一个强大的Go开发框架sponge,以低代码方式开发项目 (Jan 06, 2023)
 - go test命令 (Apr 15, 2022)
 - go应用程序性能分析 (Mar 29, 2022)
 - go runtime (Mar 14, 2022)
 - go调试工具 (Mar 13, 2022)
 - cobra基础与实践 (Mar 10, 2022)
 - grpc基础与实践 (Nov 27, 2020)
 - 配置文件viper库 (Nov 22, 2020)
 - 根据服务名称查看golang程序的profile信息 (Sep 03, 2019)
 - go语言开发规范 (Aug 28, 2019)
 - goroutine和channel应用——处理队列 (Sep 06, 2018)
 - golang中的context包 (Aug 28, 2018)