channel原理和应用

channel 是一个数据管道,是 goroutine 之间数据通信桥梁,是线程安全的。channel分为有缓冲和无缓冲两种类型,其实无缓冲类型可以理解为有缓冲的一种特殊情况。

1 channel工作原理

源码 go/src/runtime/chan.go

type hchan struct {
    qcount   uint           // 当前队列中剩余元素个数  
    dataqsiz uint           // 环形队列长度,即缓冲区的大小,即make(chan T,N),N.
    buf      unsafe.Pointer // 环形队列指针
    elemsize uint16         // 每个元素的大小 
    closed   uint32         // 表示当前通道是否处于关闭状态。创建通道后,该字段设置为0,即通道打开; 通过调用close将其设置为1,通道关闭。 
    elemtype *_type         // 元素类型,用于数据传递过程中的赋值; 
    sendx    uint           // 环形缓冲区的状态字段,它指向环形队列当前发送索引
    recvx    uint           // 环形缓冲区的状态字段,它指向环形队列当前接收索引
    recvq    waitq          // 等待读消息的goroutine队列 
    sendq    waitq          // 等待写消息的goroutine队列 
    lock     mutex          // 互斥锁,为每个读写操作锁定通道,因为发送和接收必须是互斥操作
}

从结构体看核心字段是环形队列buf,而qcount、dataqsiz、sendx、recvx是维护buf状态字段,recvq和sendq是存放发送接收channel阻塞的goroutine队列,也是间接维护buf的,lock是整个结构体的锁,避免不同goroutine读写数据竞争,因此channel是并发安全的。


1.1 发送数据到channel

环形队列buf状态 goroutine状态
buf未满 有一个goroutine发送数据,不会阻塞,把发送的数据填充到环形队列buf空闲位置,按环形队列索引顺序填充数据
buf已满 有一个goroutine发送数据,环形队列buf没地方存放了,goroutine阻塞等待,把该goroutine的现场保存下来,存放到发送goroutine队列sendq,等环形队列buf被消费后有空闲的位置,从sendq队列(先进先出)唤醒goroutine恢复现场,把发送的数据填充到环形队列空闲位置

buf未满,发送数据

buf已满


发送流程图:

发送流程图


1.2 从channel接收数据

环形队列buf状态 当前goroutine状态
buf有数据 有一个goroutine接收数据,不会阻塞,按环形队列索引顺序消费数据,消费一个数据,环形队列buf就空出一个位置
buf为空 有一个goroutine接收数据,goroutine阻塞等待,把该goroutine的现场保存下来,存放到发送goroutine队列recvq,等环形队列buf有新的数据后,从recvq队列(先进先出)唤醒goroutine恢复现场,消费buf的数据

channel有读有写

buf为空


接收流程图:

接收流程图


1.3 goroutine挂起与唤醒

goroutine挂起时调用gopark函数,唤醒调用goready函数,一般是成对出现。

  • channel发送挂起,⼀定是由channel接收端(或close)唤醒。
  • channel接收挂起,⼀定是由channel发送(或close)唤醒。
  • 当主动closechannel时,同时唤醒channel发送和接收队列的所有goroutine。


1.4 channel操作方式

操作 channel 一般有如下三种方式:

  • 读 <-ch
  • 写 ch<-
  • 关闭 close(ch)
操作 nil的channel 正常channel 已关闭的channel
读 <-ch 阻塞 成功或阻塞 读到零值
写 ch<- 阻塞 成功或阻塞 panic
关闭 close(ch) panic 成功 panic


可以使用for range接收管道数据

for v := range ch { // 一直循环等待读取数据,直到关闭通道退出循环
    fmt.Println(v)
}

可以使用select接收管道数据或发送数据到管道

var count int
for {
    select { // 可以用select作为接发送或接收选择器
    case v, ok := <-ch1: // 当关闭通道后,如果有数据也会读出来
        if !ok { // 判断通道是否已经关闭
            fmt.Println("通道已经关闭")
            return
        }
        fmt.Println(v)

    case ch2<-count: // 发送数据
    }
    count++
}


2 channel应用

2.1 信息交流

channel 的底层是一个循环队列,当队列的长度大于0的时候,可以在队列中缓存数据信息,向一个 goroutine存放数据,从一个 goroutine读取数据,就像水管的两头,这样就实现了goroutine之间的消息交流。

示例代码:

// InfoExchange 读取信息
func InfoExchange(ctx context.Context, in <-chan interface{}) {
    for {
        select {
        case v, ok := <-in:
            if !ok {
                fmt.Println("\n信息传递结束")
                return
            }
            fmt.Printf("%v ", v)

        case <-ctx.Done():
            return
        }
    }
}


测试代码:

func genInfo() <-chan interface{} {
    in := make(chan interface{}, 5)

    go func() {
        defer close(in)
        for i := 0; i < 10; i++ {
            in <- i
            time.Sleep(time.Millisecond * 500)
        }
    }()

    return in
}

func TestInfoExchange(t *testing.T) {
    in := genInfo()

    delay := time.Second * 5
    ctx, _ := context.WithTimeout(context.Background(), delay)
    go InfoExchange(ctx, in)

    <-time.After(delay)
}

/*
结果:

0 1 2 3 4 5 6 7 8 9 
信息传递结束
*/


2.2 数据传递

数据传递类似游戏“击鼓传花”。鼓响时,花(或者其它物件)从一个人手里传到下一个人,数据就类似这里的花

示例代码:

// DataTransfer 数据传递,从chan读取数据,并传递给下一个chan
func DataTransfer(id int, n int, chans []chan interface{}) {
    for {
        token := <-chans[id]
        fmt.Printf("id=%d, v=%v \n", id, token)

        chans[(id+1)%n] <- token
        time.Sleep(time.Second)
    }
}


测试代码:

func TestStartTask(t *testing.T) {
    n := 4
    chans := []chan interface{}{}

    for i := 0; i < n; i++ {
        chans = append(chans, make(chan interface{}, 1))
    }

    for i := 0; i < n; i++ {
        go DataTransfer(i, n, chans)
    }

    // 初始化数据
    chans[0] <- "a"
    chans[1] <- "b"
    chans[2] <- "c"
    chans[3] <- "d"

    <-time.After(time.Second * 5)
}

/*
结果:

id=3, v=d 
id=0, v=a 
id=1, v=b 
id=2, v=c 
id=1, v=a 
id=0, v=d 
id=3, v=c 
id=2, v=b 
...
*/


2.3 信号通知

使用非缓冲channel的特性,当channel没有数据接收时会阻塞,直到有新的数据进来或者 channel 被关闭才会退出阻塞,因此可以作为信号通知。

示例代码:

quit := make(chan os.Signal)  
signal.Notify(quit, os.Interrupt)  
<-quit


2.4 锁

示例代码:

// Mutex 使用chan实现互斥锁
type Mutex struct {
    ch chan struct{}
}

// NewMutex 使用锁需要初始化
func NewMutex() *Mutex {
    mu := &Mutex{make(chan struct{}, 1)}
    mu.ch <- struct{}{}
    return mu
}

// Lock 请求锁,直到获取到
func (m *Mutex) Lock() {
    <-m.ch
}

// Unlock 解锁
func (m *Mutex) Unlock() {
    select {
    case m.ch <- struct{}{}:
    default:
        panic("unlock of unlocked mutex")
    }
}

// TryLock 尝试获取锁
func (m *Mutex) TryLock() bool {
    select {
    case <-m.ch:
        return true
    default:
    }
    return false
}

// LockTimeout 加入一个超时的设置
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
    timer := time.NewTimer(timeout)
    select {
    case <-m.ch:
        timer.Stop()
        return true
    case <-timer.C:
    }
    return false
}

// IsLocked 锁是否已被持有
func (m *Mutex) IsLocked() bool {
    return len(m.ch) == 0
}


测试代码:

func TestMutex_TryLock(t *testing.T) {
    m := NewMutex()

    for i := 0; i < 5; i++ {
        go func(i int) {
            if m.TryLock() {
                fmt.Printf("NO %d get lock success\n", i)
            } else {
                fmt.Printf("NO %d get lock failed\n", i)
            }
        }(i)
    }

    time.Sleep(time.Millisecond)
}

/*
结果:

NO 4 get lock success
NO 2 get lock failed
NO 3 get lock failed
NO 0 get lock failed
NO 1 get lock failed
*/

func TestMutex_Lock(t *testing.T) {
    a := -1
    m := NewMutex()

    for i := 0; i < 5; i++ {
        go func(i int) {
            for {
                m.Lock()
                a = i
                time.Sleep(time.Millisecond * 100)
                fmt.Println("a =", a)
                m.Unlock()
            }

        }(i)
    }

    time.Sleep(time.Second)
}


/*
结果:

a = 4
a = 2
a = 3
a = 0
a = 1
...
*/


2.5 任务编排

2.5.1 or-Done 模式

有n 个任务,其中任意一个完成就算完成,这叫or-Done 模式,

使用场景:用户查询请求,同时发两次给集群服务,取最快返回,使用冗余请求增加体验。

示例代码:

// OrDone 任意一个channel完成就退出
func OrDone(channels ...<-chan interface{}) <-chan interface{} { // <1>

    switch len(channels) {
    case 0: // <2>
        return nil
    case 1: // <3>
        return channels[0]
    }

    orDone := make(chan interface{})
    go func() { // <4>
        defer close(orDone)

        switch len(channels) {
        case 2: // <5>
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default: // <6>
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-OrDone(append(channels[3:], orDone)...): // <6>
            }
        }
    }()
    return orDone
}


测试:

func done(after time.Duration) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        fmt.Println("delay:", after)
        time.Sleep(after)
    }()
    return c
}

// 随机1~10秒
func randTime() time.Duration {
    n := time.Duration(rand.Int31n(10))
    return n * time.Second
}

func TestOrDone(t *testing.T) {

    <-OrDone(
        done(randTime()),
        done(randTime()),
        done(randTime()),
        done(randTime()),
        done(randTime()),
        done(randTime()),
    )
}

/*
结果:

delay: 1s
delay: 1s
delay: 8s
delay: 7s
delay: 7s
delay: 9s
*/


2.5.2 扇入模式

多个结果组合到一个channel中的过程叫扇入模式下,输入源有多个,输出目标只有一个。

示例代码:

// FanIn 扇入,多个channel组合到一个channel
func FanIn(chans ...<-chan interface{}) <-chan interface{} {
    switch len(chans) {
    case 0:
        c := make(chan interface{})
        close(c)
        return c
    case 1:
        return chans[0]
    case 2:
        return mergeTwo(chans[0], chans[1])
    default:
        m := len(chans) / 2
        return mergeTwo( // 对多个数据进行合并处理
            FanIn(chans[:m]...),
            FanIn(chans[m:]...))
    }
}

func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        for a != nil || b != nil { //只要还有可读的chan
            select {
            case v, ok := <-a:
                if !ok { // a 已关闭,设置为nil
                    a = nil
                    continue
                }
                c <- v
            case v, ok := <-b:
                if !ok { // b 已关闭,设置为nil
                    b = nil
                    continue
                }
                c <- v
            }
        }
    }()
    return c
}


测试:

func done(v int) <-chan interface{} {
    in := make(chan interface{})
    go func() {
        defer close(in)
        in <- v
        time.Sleep(time.Millisecond * 500)
    }()
    return in
}

func TestFanIn(t *testing.T) {
    out := FanIn(
        done(1),
        done(2),
        done(3),
    )

    for v := range out {
        fmt.Println(v)
    }
}

/*
结果:

1
3
2
*/


2.5.3 扇出模式

扇出模式(Fan-Out)只有一个输入源,但是有多个输出目标。从源 channel 取出一个数据后,依次发送给多个目标 channel。发送的时候,既可以同步,也可以异步。

示例代码:

// FanOut 扇出,只有一个输入源,但是有多个输出目标
func FanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
    go func() {
        defer func() { //退出时关闭所有的输出chan
            for i := 0; i < len(out); i++ {
                close(out[i])
            }
        }()

        for v := range ch { // 从输入chan中读取数据
            v := v
            for i := 0; i < len(out); i++ {
                i := i
                if async { //异步
                    go func() {
                        out[i] <- v // 放入到输出chan中,异步方式
                    }()
                } else {
                    out[i] <- v // 放入到输出chan中,同步方式
                }
            }
        }
    }()
}


测试:

func TestFanOut(t *testing.T) {
    ch := make(chan interface{})
    chLister := []chan interface{}{make(chan interface{}), make(chan interface{}), make(chan interface{})}

    FanOut(ch, chLister, false)

    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println(<-chLister[0], <-chLister[1], <-chLister[2])
        time.Sleep(time.Second)
    }
}

/*
结果:

0 0 0
1 1 1
2 2 2
3 3 3
4 4 4
*/


2.5.4 stream

stream 是把 channel 当做流式管道的方式。

// AsStream 将一个 slice 转成流
func AsStream(done <-chan struct{}, values ...interface{}) <-chan interface{} {
    s := make(chan interface{}) //创建一个unbuffered的channel
    go func() {                 // 启动一个goroutine,往s中塞数据
        defer close(s)             // 退出时关闭chan
        for _, v := range values { // 遍历数组
            select {
            case <-done:
                return
            case s <- v: // 将数组元素塞入到chan中
            }
        }
    }()
    return s
}

// TakeN 获取流的前n个数据
func TakeN(done <-chan struct{}, valueStream <-chan interface{}, n int) <-chan interface{} {
    takeStream := make(chan interface{}) // 创建输出流
    go func() {
        defer close(takeStream)
        for i := 0; i < n; i++ { // 只读取前num个元素
            select {
            case <-done:
                return
            case takeStream <- <-valueStream: //从输入流中读取元素
            }
        }
    }()
    return takeStream
}


2.5.5 map-reduce

map-reduce 是一种面向大规模数据处理的并行计算模型和方法,但是这里要介绍的是一种单机版的 map-reduce 模式。

map-reduce 分为两个步骤,第一步是 map,将队列中的数据用 mapFn 函数处理;第二步是 reduce,将处理后的数据用 reduceFn 函数汇总。

// MapChan 处理mapFn处理数据
func MapChan(in <-chan interface{}, mapFn func(interface{}) interface{}) <-chan interface{} {
    out := make(chan interface{}) // 创建一个输出chan
    if in == nil {                // 异常检查
        close(out)
        return out
    }

    go func() { // 启动一个goroutine,实现map的主要逻辑
        defer close(out)
        for v := range in { // 从输入chan读取数据,执行业务操作,也就是map操作
            out <- mapFn(v)
        }
    }()

    return out
}

// Reduce  reduceFn函数汇总
func Reduce(in <-chan interface{}, reduceFn func(r, v interface{}) interface{}) interface{} {
    if in == nil { // 异常检查
        return nil
    }

    out := <-in         // 先读取第一个元素
    for v := range in { // 实现reduce的主要逻辑
        out = reduceFn(out, v)
    }

    return out
}


测试:

// 需求:将一组数据中每个数据乘以10,最后计算总和。为此,我们需要实现 mapFn (乘 10) 和 reduceFn (求和)

// 生成一个数据流
func numStream(done <-chan struct{}) <-chan interface{} {
    s := make(chan interface{})
    values := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    go func() {
        defer close(s)
        for _, v := range values { // 从数组生成
            select {
            case <-done:
                return
            case s <- v:
            }
        }
    }()
    return s
}

func TestMapReduce(t *testing.T) {
    in := numStream(nil)

    // map操作: 乘以10
    mapFn := func(v interface{}) interface{} {
        return v.(int) * 10
    }

    // reduce操作: 对map的结果进行累加
    reduceFn := func(r, v interface{}) interface{} {
        return r.(int) + v.(int)
    }

    sum := Reduce(MapChan(in, mapFn), reduceFn) //返回累加结果
    fmt.Println(sum)
}


2.6 worker模式

2.6.1 最简单的worker处理队列方式
package main

import "time"

type Job int

func worker(jobChan <-chan Job) {
    for job := range jobChan {
        // 顺序执行,缺点:如果处理过程中有等待或阻塞,会影响整个队列
        Process(job)

        // 并发执行,如果处理过程中有等待或阻塞,不会影响其他的job,
        // 缺点:并发处理job的goroutine数量不可控,每来一个新job就会启动一个goroutine,不建议这样处理。
        // 通常做法是开启有限个worker的goroutine来并行处理队列的job,而不是在process里并发执行。
        //go Process(job)
    }
}

func Process(job Job) {
    if job == 3 { // 等于3的这个job阻塞1s
        time.Sleep(time.Second)
    }
    println("job", job)
}

func main() {
    // make a channel with a capacity of 10.
    jobChan := make(chan Job, 10)

    // start the worker
    go worker(jobChan)

    // enqueue a job
    for i := 0; i < 20; i++ {
        jobChan <- Job(i)
    }

    time.Sleep(2 * time.Second)
}

/* 当次执行的结果如下:

(1) 顺序执行,在处理job 3时阻塞了一秒,其他job要等job 3处理完毕后再处理往下执行。
job 0
job 1
job 2
job 3
job 4
job 5
job 6
job 7
job 8
job 9
job 10
job 11
job 12
job 13
job 14
job 15
job 16
job 17
job 18
job 19

------------------------------------------------------

(2) 并发process执行,在处理job 3时阻塞了一秒,不影响其他的job处理。
job 2
job 10
job 0
job 6
job 4
job 5
job 9
job 7
job 1
job 8
job 13
job 11
job 12
job 19
job 17
job 18
job 14
job 15
job 16
job 3
*/


2.6.2 使用worker池处理队列
package main

import (
    "fmt"
    "time"
)

type Job int

func worker(i int, jobChan <-chan Job) {
    for job := range jobChan {
        Process(i, job)
    }
}

func Process(i int, job Job) {
    if job == 3 {
        time.Sleep(time.Second)
    }
    fmt.Printf("worker %2d process job %d\n", i, job)
}

func workPool(workerSize int, jobChan chan Job) {
    for i := 0; i < workerSize; i++ {
        go func(i int) {
            worker(i, jobChan)
        }(i)
    }
}

func main() {
    jobChan := make(chan Job, 10)

    // 启动多个worker池并发处理队列的job,多个worker去抢队列job来处理,只有空闲的worker才能从队列中获取job
    workPool(5, jobChan)

    for i := 0; i < 20; i++ {
        jobChan <- Job(i)
    }

    time.Sleep(2 * time.Second)
}

/*当次执行结果如下:

可以看到worker 2处理job 3时阻塞一秒,worker 2在阻塞过程中没有去抢占队列新的job来处理。
worker  4 process job 0
worker  3 process job 4
worker  0 process job 2
worker  3 process job 6
worker  3 process job 8
worker  3 process job 9
worker  3 process job 10
worker  3 process job 11
worker  3 process job 12
worker  3 process job 13
worker  3 process job 14
worker  3 process job 15
worker  3 process job 16
worker  3 process job 17
worker  3 process job 18
worker  1 process job 1
worker  4 process job 5
worker  0 process job 7
worker  3 process job 19
worker  2 process job 3
*/


2.6.3 等待worker处理所有队列的job
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job int

func worker(n int, jobChan <-chan Job, wg *sync.WaitGroup) {
    for job := range jobChan {
        Process(n, job, wg)
    }
}

func Process(n int, job Job, wg *sync.WaitGroup) {
    defer wg.Done()

    // 加上随机延时,方便查看打印效果
    size := randTime()
    time.Sleep(size * time.Millisecond)

    fmt.Printf("worker %2d process job %d, time %dms\n", n, job, size)
}

// 随机时间100~500
func randTime() time.Duration {
    rand.Seed(time.Now().UnixNano())
    return time.Duration(rand.Intn(400) + 100)
}

func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
    ch := make(chan struct{})

    go func() {
        wg.Wait()
        close(ch)
    }()

    select {
    case <-ch:
        return true
    case <-time.After(timeout):
        return false
    }
}

func workPool(workerNum int, jobChan chan Job, wg *sync.WaitGroup) {
    for i := 0; i < workerNum; i++ {
        go func(i int) {
            worker(i, jobChan, wg)
        }(i)
    }
}

func main() {
    wg := &sync.WaitGroup{}
    jobChan := make(chan Job, 10)

    workPool(5, jobChan, wg)

    for i := 0; i < 20; i++ {
        wg.Add(1)
        jobChan <- Job(i)
    }

    t := time.Now()

    // 等待所有job处理完毕才退出,不足:不管是顺序执行还是并发process方式处理job,如果其中有一个job阻塞了,会一直等待下去
    wg.Wait()

    // 带超时等待,如果超时了,直接忽略等待
    //ok := WaitTimeout(wg, 300*time.Millisecond)
    //if !ok {
    //  fmt.Printf("\n warning, process job timeout \n")
    //}

    fmt.Printf("\n handle queue time %v\n", time.Now().Sub(t))
}

/* 当次执行结果如下:

(1) 不带超时的wait
worker  2 process job 2, time 147ms
worker  0 process job 0, time 147ms
worker  1 process job 1, time 147ms
worker  3 process job 4, time 147ms
worker  4 process job 3, time 147ms
worker  1 process job 7, time 282ms
worker  4 process job 9, time 282ms
worker  0 process job 6, time 282ms
worker  3 process job 8, time 282ms
worker  2 process job 5, time 399ms
worker  1 process job 10, time 149ms
worker  0 process job 12, time 149ms
worker  3 process job 13, time 149ms
worker  4 process job 11, time 149ms
worker  2 process job 14, time 145ms
worker  0 process job 16, time 265ms
worker  1 process job 15, time 265ms
worker  3 process job 17, time 265ms
worker  4 process job 18, time 265ms
worker  2 process job 19, time 234ms

 handle queue time 786.8609ms

------------------------------------------------

(2) 有超时的wait
worker  3 process job 3, time 257ms
worker  1 process job 1, time 272ms
worker  0 process job 0, time 272ms
worker  4 process job 4, time 272ms
worker  2 process job 2, time 272ms
worker  4 process job 8, time 102ms
worker  3 process job 5, time 224ms

 warning, process job timeout

 handle queue time 300.3939ms
*/


2.6.4 使用context或channel停止worker

有两种方式停止处理队列中的worker,分别是context和channel,多级函数传递或复杂点的控制建议使用context。

(1) 使用context停止worker

package main

import (
    "fmt"
    "math/rand"
    "time"

    "golang.org/x/net/context"
)

type Job int

func workPool(workerNum int, jobChan chan Job, ctx context.Context) {
    for i := 0; i < workerNum; i++ {
        go func(i int) {
            worker(i, jobChan, ctx)
        }(i)
    }
}

// 通过context取消未完成的job
func worker(n int, jobChan <-chan Job, ctx context.Context) {
    for {
        select {
        case job := <-jobChan:
            Process(n, job)

        case <-ctx.Done():
            fmt.Printf("cancel worker %d\n", n)
            return
        }
    }
}

func Process(n int, job Job) {
    size := randTime()
    time.Sleep(size * time.Millisecond)
    fmt.Printf("worker %2d process job %2d, time %dms\n", n, job, size)
}

// 随机时间100~500
func randTime() time.Duration {
    rand.Seed(time.Now().UnixNano())
    return time.Duration(rand.Intn(400) + 100)
}

func main() {
    jobChan := make(chan Job, 10)

    // 使用context控制worker是否停止,适合多级函数传递和控制,并且有超时取消
    ctx, cancel := context.WithCancel(context.Background())
    //ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 可以设置延时的context
    workPool(5, jobChan, ctx)
    for i := 0; i < 20; i++ {
        jobChan <- Job(i)
    }

    cancel()
    time.Sleep(5 * time.Second)
}

/* 当次执行结果如下:
worker  2 process job  3, time 471ms
worker  3 process job  4, time 471ms
worker  1 process job  1, time 471ms
worker  4 process job  0, time 471ms
worker  0 process job  2, time 471ms
worker  3 process job  6, time 200ms
cancel worker 3
worker  4 process job  8, time 200ms
worker  2 process job  5, time 200ms
cancel worker 2
worker  0 process job  9, time 200ms
cancel worker 0 by context
worker  1 process job  7, time 200ms
cancel worker 1
worker  4 process job 10, time 310ms
cancel worker 4
*/


(2)使用channel停止worker

package main

import (
    "fmt"
    "math/rand"
    "time"
)

type Job int

func workPool(workerNum int, jobChan chan Job, ch chan struct{}) {
    for i := 0; i < workerNum; i++ {
        go func(i int) {
            worker(i, jobChan, ch)
        }(i)
    }
}

// 通过channel取消未完成的job
func worker(n int, jobChan <-chan Job, ch chan struct{}) {
    for {
        select {
        case job := <-jobChan:
            Process(n, job)

        case <-ch:
            fmt.Printf("cancel worker %d\n", n)
            return
        }
    }
}

func Process(n int, job Job) {
    size := randTime()
    time.Sleep(size * time.Millisecond)
    fmt.Printf("worker %2d process job %2d, time %dms\n", n, job, size)
}

// 随机时间100~500
func randTime() time.Duration {
    rand.Seed(time.Now().UnixNano())
    return time.Duration(rand.Intn(400) + 100)
}

func main() {
    jobChan := make(chan Job, 10)

    // 使用channel控制worker是否停止
    ch := make(chan struct{})
    workPool(5, jobChan, ch)
    for i := 0; i < 20; i++ {
        jobChan <- Job(i)
    }

    close(ch)
    time.Sleep(5 * time.Second)

}

/* 当次执行结果如下:
worker  1 process job  1, time 313ms
worker  3 process job  4, time 313ms
worker  2 process job  3, time 313ms
worker  0 process job  2, time 313ms
worker  4 process job  0, time 313ms
worker  3 process job  6, time 258ms
cancel worker 3
worker  2 process job  7, time 258ms
worker  4 process job  9, time 258ms
cancel worker 4
worker  0 process job  8, time 258ms
worker  1 process job  5, time 258ms
cancel worker 1
worker  0 process job 11, time 190ms
cancel worker 0
worker  2 process job 10, time 226ms
worker  2 process job 12, time 485ms
cancel worker 2
*/


2.7 关闭事件跟踪器

package main

import (
    "context"
    "fmt"
    "time"
)

// Tracker 跟踪器  
type Tracker struct {  
   ch   chan string  
   stop chan struct{}  
}  
  
// NewTracker 实例化 func NewTracker() *Tracker {  
   return &Tracker{  
      ch:   make(chan string, 20),  
      stop: make(chan struct{}),  
   }  
}  
  
// Event 触发事件  
func (t *Tracker) Event(ctx context.Context, data string) error {  
   select {  
   case t.ch <- data:  
      return nil  
   case <-ctx.Done():  
      return ctx.Err()  
   }  
}  
  
// Run 执行  
func (t *Tracker) Run() {  
   for data := range t.ch {  
      fmt.Println(data)  
      time.Sleep(time.Second * 10)  
   }  
   t.stop <- struct{}{}  
}  
  
// Shutdown 关闭  
func (t *Tracker) Shutdown(ctx context.Context) {  
   close(t.ch)  
   select {  
   case <-t.stop:  
      fmt.Println("正常结束")  
   case <-ctx.Done():  
      fmt.Println("超时结束")  
   }  
}

func main() {  
   ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*2))  
   defer cancel() 
    
   tr := NewTracker()  
   go tr.Run()  
  
   tr.Event(ctx, "data1")  
   tr.Event(ctx, "data2")  
   tr.Event(ctx, "data3")  
  
   tr.Shutdown(ctx)

/*
执行结果:

data1
超时结束
*/
}


3 注意事项

未初始化的channel,读取里面的数据时,会造成死锁deadlock

var ch chan int
<-ch  // 未初始化channel读数据会死锁`


未初始化的channel,往里面写数据时,会造成死锁deadlock

var ch chan int
ch<-  // 未初始化channel写数据会死锁


未初始化的channel,关闭该channel时,会panic

var ch chan int
close(ch) // 关闭未初始化channel,触发panic


向已关闭的channel写数据,会pannic

var ch =make(chan int)
close(ch) 
ch<-1 // channel已关闭,触发panic


专题「golang相关」的其它文章 »