goroutine和channel应用——处理队列

1 最简单的worker处理队列方式

package main

import "time"

type Job int

func worker(jobChan <-chan Job) {
    for job := range jobChan {
        // 顺序执行,缺点:如果处理过程中有等待或阻塞,会影响整个队列
        Process(job)

        // 并发执行,如果处理过程中有等待或阻塞,不会影响其他的job,
        // 缺点:并发处理job的goroutine数量不可控,每来一个新job就会启动一个goroutine,不建议这样处理。
        // 通常做法是开启有限个worker的goroutine来并行处理队列的job,而不是在process里并发执行。
        //go Process(job)
    }
}

func Process(job Job) {
    if job == 3 { // 等于3的这个job阻塞1s
        time.Sleep(time.Second)
    }
    println("job", job)
}

func main() {
    // make a channel with a capacity of 10.
    jobChan := make(chan Job, 10)

    // start the worker
    go worker(jobChan)

    // enqueue a job
    for i := 0; i < 20; i++ {
        jobChan <- Job(i)
    }

    time.Sleep(2 * time.Second)
}

/* 当次执行的结果如下:

(1) 顺序执行,在处理job 3时阻塞了一秒,其他job要等job 3处理完毕后再处理往下执行。
job 0
job 1
job 2
job 3
job 4
job 5
job 6
job 7
job 8
job 9
job 10
job 11
job 12
job 13
job 14
job 15
job 16
job 17
job 18
job 19

------------------------------------------------------

(2) 并发process执行,在处理job 3时阻塞了一秒,不影响其他的job处理。
job 2
job 10
job 0
job 6
job 4
job 5
job 9
job 7
job 1
job 8
job 13
job 11
job 12
job 19
job 17
job 18
job 14
job 15
job 16
job 3
*/



2 使用worker池处理队列

package main

import (
    "fmt"
    "time"
)

type Job int

func worker(i int, jobChan <-chan Job) {
    for job := range jobChan {
        Process(i, job)
    }
}

func Process(i int, job Job) {
    if job == 3 {
        time.Sleep(time.Second)
    }
    fmt.Printf("worker %2d process job %d\n", i, job)
}

func workPool(workerNum int, jobChan chan Job) {
    for i := 0; i < workerNum; i++ {
        go func(i int) {
            worker(i, jobChan)
        }(i)
    }
}

func main() {
    jobChan := make(chan Job, 10)

    // 启动多个worker池并发处理队列的job,多个worker去抢队列job来处,只有空闲的worker才能从队列中获取job
    workPool(5, jobChan)

    for i := 0; i < 20; i++ {
        jobChan <- Job(i)
    }

    time.Sleep(2 * time.Second)
}

/*当次执行结果如下:

可以看到worker 2处理job 3时阻塞一秒,worker 2在阻塞过程中没有去抢占队列新的job来处理。
worker  4 process job 0
worker  3 process job 4
worker  0 process job 2
worker  3 process job 6
worker  3 process job 8
worker  3 process job 9
worker  3 process job 10
worker  3 process job 11
worker  3 process job 12
worker  3 process job 13
worker  3 process job 14
worker  3 process job 15
worker  3 process job 16
worker  3 process job 17
worker  3 process job 18
worker  1 process job 1
worker  4 process job 5
worker  0 process job 7
worker  3 process job 19
worker  2 process job 3
*/



3 等待worker处理所有队列的job

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job int

func worker(n int, jobChan <-chan Job, wg *sync.WaitGroup) {
    for job := range jobChan {
        Process(n, job, wg)
    }
}

func Process(n int, job Job, wg *sync.WaitGroup) {
    defer wg.Done()

    // 加上随机延时,方便查看打印效果
    size := randTime()
    time.Sleep(size * time.Millisecond)

    fmt.Printf("worker %2d process job %d, time %dms\n", n, job, size)
}

// 随机时间100~500
func randTime() time.Duration {
    rand.Seed(time.Now().UnixNano())
    return time.Duration(rand.Intn(400) + 100)
}

func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
    ch := make(chan struct{})

    go func() {
        wg.Wait()
        close(ch)
    }()

    select {
    case <-ch:
        return true
    case <-time.After(timeout):
        return false
    }
}

func workPool(workerNum int, jobChan chan Job, wg *sync.WaitGroup) {
    for i := 0; i < workerNum; i++ {
        go func(i int) {
            worker(i, jobChan, wg)
        }(i)
    }
}

func main() {
    wg := &sync.WaitGroup{}
    jobChan := make(chan Job, 10)

    workPool(5, jobChan, wg)

    for i := 0; i < 20; i++ {
        wg.Add(1)
        jobChan <- Job(i)
    }

    t := time.Now()

    // 等待所有job处理完毕才退出,不足:不管是顺序执行还是并发process方式处理job,如果其中有一个job阻塞了,会一直等待下去
    wg.Wait()

    // 带超时等待,如果超时了,直接忽略等待
    //ok := WaitTimeout(wg, 300*time.Millisecond)
    //if !ok {
    //  fmt.Printf("\n warning, process job timeout \n")
    //}

    fmt.Printf("\n handle queue time %v\n", time.Now().Sub(t))
}

/* 当次执行结果如下:

(1) 不带超时的wait
worker  2 process job 2, time 147ms
worker  0 process job 0, time 147ms
worker  1 process job 1, time 147ms
worker  3 process job 4, time 147ms
worker  4 process job 3, time 147ms
worker  1 process job 7, time 282ms
worker  4 process job 9, time 282ms
worker  0 process job 6, time 282ms
worker  3 process job 8, time 282ms
worker  2 process job 5, time 399ms
worker  1 process job 10, time 149ms
worker  0 process job 12, time 149ms
worker  3 process job 13, time 149ms
worker  4 process job 11, time 149ms
worker  2 process job 14, time 145ms
worker  0 process job 16, time 265ms
worker  1 process job 15, time 265ms
worker  3 process job 17, time 265ms
worker  4 process job 18, time 265ms
worker  2 process job 19, time 234ms

 handle queue time 786.8609ms

------------------------------------------------

(2) 有超时的wait
worker  3 process job 3, time 257ms
worker  1 process job 1, time 272ms
worker  0 process job 0, time 272ms
worker  4 process job 4, time 272ms
worker  2 process job 2, time 272ms
worker  4 process job 8, time 102ms
worker  3 process job 5, time 224ms

 warning, process job timeout

 handle queue time 300.3939ms
*/



4 使用context或channel停止worker

有两种方式停止处理队列中的worker,分别是context和channel,多级函数传递或复杂点的控制建议使用context。

4.1 使用context停止worker

package main

import (
    "fmt"
    "math/rand"
    "time"

    "golang.org/x/net/context"
)

type Job int

func workPool(workerNum int, jobChan chan Job, ctx context.Context) {
    for i := 0; i < workerNum; i++ {
        go func(i int) {
            worker(i, jobChan, ctx)
        }(i)
    }
}

// 通过context取消未完成的job
func worker(n int, jobChan <-chan Job, ctx context.Context) {
    for {
        select {
        case job := <-jobChan:
            Process(n, job)

        case <-ctx.Done():
            fmt.Printf("cancel worker %d\n", n)
            return
        }
    }
}

func Process(n int, job Job) {
    size := randTime()
    time.Sleep(size * time.Millisecond)
    fmt.Printf("worker %2d process job %2d, time %dms\n", n, job, size)
}

// 随机时间100~500
func randTime() time.Duration {
    rand.Seed(time.Now().UnixNano())
    return time.Duration(rand.Intn(400) + 100)
}

func main() {
    jobChan := make(chan Job, 10)

    // 使用context控制worker是否停止,适合多级函数传递和控制,并且有超时取消
    ctx, cancel := context.WithCancel(context.Background())
    //ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 可以设置延时的context
    workPool(5, jobChan, ctx)
    for i := 0; i < 20; i++ {
        jobChan <- Job(i)
    }

    cancel()
    time.Sleep(5 * time.Second)
}

/* 当次执行结果如下:
worker  2 process job  3, time 471ms
worker  3 process job  4, time 471ms
worker  1 process job  1, time 471ms
worker  4 process job  0, time 471ms
worker  0 process job  2, time 471ms
worker  3 process job  6, time 200ms
cancel worker 3
worker  4 process job  8, time 200ms
worker  2 process job  5, time 200ms
cancel worker 2
worker  0 process job  9, time 200ms
cancel worker 0 by context
worker  1 process job  7, time 200ms
cancel worker 1
worker  4 process job 10, time 310ms
cancel worker 4
*/


4.2 使用channel停止worker

package main

import (
    "fmt"
    "math/rand"
    "time"
)

type Job int

func workPool(workerNum int, jobChan chan Job, ch chan struct{}) {
    for i := 0; i < workerNum; i++ {
        go func(i int) {
            worker(i, jobChan, ch)
        }(i)
    }
}

// 通过channel取消未完成的job
func worker(n int, jobChan <-chan Job, ch chan struct{}) {
    for {
        select {
        case job := <-jobChan:
            Process(n, job)

        case <-ch:
            fmt.Printf("cancel worker %d\n", n)
            return
        }
    }
}

func Process(n int, job Job) {
    size := randTime()
    time.Sleep(size * time.Millisecond)
    fmt.Printf("worker %2d process job %2d, time %dms\n", n, job, size)
}

// 随机时间100~500
func randTime() time.Duration {
    rand.Seed(time.Now().UnixNano())
    return time.Duration(rand.Intn(400) + 100)
}

func main() {
    jobChan := make(chan Job, 10)

    // 使用channel控制worker是否停止
    ch := make(chan struct{})
    workPool(5, jobChan, ch)
    for i := 0; i < 20; i++ {
        jobChan <- Job(i)
    }

    close(ch)
    time.Sleep(5 * time.Second)

}

/* 当次执行结果如下:
worker  1 process job  1, time 313ms
worker  3 process job  4, time 313ms
worker  2 process job  3, time 313ms
worker  0 process job  2, time 313ms
worker  4 process job  0, time 313ms
worker  3 process job  6, time 258ms
cancel worker 3
worker  2 process job  7, time 258ms
worker  4 process job  9, time 258ms
cancel worker 4
worker  0 process job  8, time 258ms
worker  1 process job  5, time 258ms
cancel worker 1
worker  0 process job 11, time 190ms
cancel worker 0
worker  2 process job 10, time 226ms
worker  2 process job 12, time 485ms
cancel worker 2
*/


专题「golang相关」的其它文章 »