1 最简单的worker处理队列方式
package main
import "time"
type Job int
func worker(jobChan <-chan Job) {
for job := range jobChan {
// 顺序执行,缺点:如果处理过程中有等待或阻塞,会影响整个队列
Process(job)
// 并发执行,如果处理过程中有等待或阻塞,不会影响其他的job,
// 缺点:并发处理job的goroutine数量不可控,每来一个新job就会启动一个goroutine,不建议这样处理。
// 通常做法是开启有限个worker的goroutine来并行处理队列的job,而不是在process里并发执行。
//go Process(job)
}
}
func Process(job Job) {
if job == 3 { // 等于3的这个job阻塞1s
time.Sleep(time.Second)
}
println("job", job)
}
func main() {
// make a channel with a capacity of 10.
jobChan := make(chan Job, 10)
// start the worker
go worker(jobChan)
// enqueue a job
for i := 0; i < 20; i++ {
jobChan <- Job(i)
}
time.Sleep(2 * time.Second)
}
/* 当次执行的结果如下:
(1) 顺序执行,在处理job 3时阻塞了一秒,其他job要等job 3处理完毕后再处理往下执行。
job 0
job 1
job 2
job 3
job 4
job 5
job 6
job 7
job 8
job 9
job 10
job 11
job 12
job 13
job 14
job 15
job 16
job 17
job 18
job 19
------------------------------------------------------
(2) 并发process执行,在处理job 3时阻塞了一秒,不影响其他的job处理。
job 2
job 10
job 0
job 6
job 4
job 5
job 9
job 7
job 1
job 8
job 13
job 11
job 12
job 19
job 17
job 18
job 14
job 15
job 16
job 3
*/
2 使用worker池处理队列
package main
import (
"fmt"
"time"
)
type Job int
func worker(i int, jobChan <-chan Job) {
for job := range jobChan {
Process(i, job)
}
}
func Process(i int, job Job) {
if job == 3 {
time.Sleep(time.Second)
}
fmt.Printf("worker %2d process job %d\n", i, job)
}
func workPool(workerNum int, jobChan chan Job) {
for i := 0; i < workerNum; i++ {
go func(i int) {
worker(i, jobChan)
}(i)
}
}
func main() {
jobChan := make(chan Job, 10)
// 启动多个worker池并发处理队列的job,多个worker去抢队列job来处,只有空闲的worker才能从队列中获取job
workPool(5, jobChan)
for i := 0; i < 20; i++ {
jobChan <- Job(i)
}
time.Sleep(2 * time.Second)
}
/*当次执行结果如下:
可以看到worker 2处理job 3时阻塞一秒,worker 2在阻塞过程中没有去抢占队列新的job来处理。
worker 4 process job 0
worker 3 process job 4
worker 0 process job 2
worker 3 process job 6
worker 3 process job 8
worker 3 process job 9
worker 3 process job 10
worker 3 process job 11
worker 3 process job 12
worker 3 process job 13
worker 3 process job 14
worker 3 process job 15
worker 3 process job 16
worker 3 process job 17
worker 3 process job 18
worker 1 process job 1
worker 4 process job 5
worker 0 process job 7
worker 3 process job 19
worker 2 process job 3
*/
3 等待worker处理所有队列的job
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job int
func worker(n int, jobChan <-chan Job, wg *sync.WaitGroup) {
for job := range jobChan {
Process(n, job, wg)
}
}
func Process(n int, job Job, wg *sync.WaitGroup) {
defer wg.Done()
// 加上随机延时,方便查看打印效果
size := randTime()
time.Sleep(size * time.Millisecond)
fmt.Printf("worker %2d process job %d, time %dms\n", n, job, size)
}
// 随机时间100~500
func randTime() time.Duration {
rand.Seed(time.Now().UnixNano())
return time.Duration(rand.Intn(400) + 100)
}
func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
ch := make(chan struct{})
go func() {
wg.Wait()
close(ch)
}()
select {
case <-ch:
return true
case <-time.After(timeout):
return false
}
}
func workPool(workerNum int, jobChan chan Job, wg *sync.WaitGroup) {
for i := 0; i < workerNum; i++ {
go func(i int) {
worker(i, jobChan, wg)
}(i)
}
}
func main() {
wg := &sync.WaitGroup{}
jobChan := make(chan Job, 10)
workPool(5, jobChan, wg)
for i := 0; i < 20; i++ {
wg.Add(1)
jobChan <- Job(i)
}
t := time.Now()
// 等待所有job处理完毕才退出,不足:不管是顺序执行还是并发process方式处理job,如果其中有一个job阻塞了,会一直等待下去
wg.Wait()
// 带超时等待,如果超时了,直接忽略等待
//ok := WaitTimeout(wg, 300*time.Millisecond)
//if !ok {
// fmt.Printf("\n warning, process job timeout \n")
//}
fmt.Printf("\n handle queue time %v\n", time.Now().Sub(t))
}
/* 当次执行结果如下:
(1) 不带超时的wait
worker 2 process job 2, time 147ms
worker 0 process job 0, time 147ms
worker 1 process job 1, time 147ms
worker 3 process job 4, time 147ms
worker 4 process job 3, time 147ms
worker 1 process job 7, time 282ms
worker 4 process job 9, time 282ms
worker 0 process job 6, time 282ms
worker 3 process job 8, time 282ms
worker 2 process job 5, time 399ms
worker 1 process job 10, time 149ms
worker 0 process job 12, time 149ms
worker 3 process job 13, time 149ms
worker 4 process job 11, time 149ms
worker 2 process job 14, time 145ms
worker 0 process job 16, time 265ms
worker 1 process job 15, time 265ms
worker 3 process job 17, time 265ms
worker 4 process job 18, time 265ms
worker 2 process job 19, time 234ms
handle queue time 786.8609ms
------------------------------------------------
(2) 有超时的wait
worker 3 process job 3, time 257ms
worker 1 process job 1, time 272ms
worker 0 process job 0, time 272ms
worker 4 process job 4, time 272ms
worker 2 process job 2, time 272ms
worker 4 process job 8, time 102ms
worker 3 process job 5, time 224ms
warning, process job timeout
handle queue time 300.3939ms
*/
4 使用context或channel停止worker
有两种方式停止处理队列中的worker,分别是context和channel,多级函数传递或复杂点的控制建议使用context。
4.1 使用context停止worker
package main
import (
"fmt"
"math/rand"
"time"
"golang.org/x/net/context"
)
type Job int
func workPool(workerNum int, jobChan chan Job, ctx context.Context) {
for i := 0; i < workerNum; i++ {
go func(i int) {
worker(i, jobChan, ctx)
}(i)
}
}
// 通过context取消未完成的job
func worker(n int, jobChan <-chan Job, ctx context.Context) {
for {
select {
case job := <-jobChan:
Process(n, job)
case <-ctx.Done():
fmt.Printf("cancel worker %d\n", n)
return
}
}
}
func Process(n int, job Job) {
size := randTime()
time.Sleep(size * time.Millisecond)
fmt.Printf("worker %2d process job %2d, time %dms\n", n, job, size)
}
// 随机时间100~500
func randTime() time.Duration {
rand.Seed(time.Now().UnixNano())
return time.Duration(rand.Intn(400) + 100)
}
func main() {
jobChan := make(chan Job, 10)
// 使用context控制worker是否停止,适合多级函数传递和控制,并且有超时取消
ctx, cancel := context.WithCancel(context.Background())
//ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 可以设置延时的context
workPool(5, jobChan, ctx)
for i := 0; i < 20; i++ {
jobChan <- Job(i)
}
cancel()
time.Sleep(5 * time.Second)
}
/* 当次执行结果如下:
worker 2 process job 3, time 471ms
worker 3 process job 4, time 471ms
worker 1 process job 1, time 471ms
worker 4 process job 0, time 471ms
worker 0 process job 2, time 471ms
worker 3 process job 6, time 200ms
cancel worker 3
worker 4 process job 8, time 200ms
worker 2 process job 5, time 200ms
cancel worker 2
worker 0 process job 9, time 200ms
cancel worker 0 by context
worker 1 process job 7, time 200ms
cancel worker 1
worker 4 process job 10, time 310ms
cancel worker 4
*/
4.2 使用channel停止worker
package main
import (
"fmt"
"math/rand"
"time"
)
type Job int
func workPool(workerNum int, jobChan chan Job, ch chan struct{}) {
for i := 0; i < workerNum; i++ {
go func(i int) {
worker(i, jobChan, ch)
}(i)
}
}
// 通过channel取消未完成的job
func worker(n int, jobChan <-chan Job, ch chan struct{}) {
for {
select {
case job := <-jobChan:
Process(n, job)
case <-ch:
fmt.Printf("cancel worker %d\n", n)
return
}
}
}
func Process(n int, job Job) {
size := randTime()
time.Sleep(size * time.Millisecond)
fmt.Printf("worker %2d process job %2d, time %dms\n", n, job, size)
}
// 随机时间100~500
func randTime() time.Duration {
rand.Seed(time.Now().UnixNano())
return time.Duration(rand.Intn(400) + 100)
}
func main() {
jobChan := make(chan Job, 10)
// 使用channel控制worker是否停止
ch := make(chan struct{})
workPool(5, jobChan, ch)
for i := 0; i < 20; i++ {
jobChan <- Job(i)
}
close(ch)
time.Sleep(5 * time.Second)
}
/* 当次执行结果如下:
worker 1 process job 1, time 313ms
worker 3 process job 4, time 313ms
worker 2 process job 3, time 313ms
worker 0 process job 2, time 313ms
worker 4 process job 0, time 313ms
worker 3 process job 6, time 258ms
cancel worker 3
worker 2 process job 7, time 258ms
worker 4 process job 9, time 258ms
cancel worker 4
worker 0 process job 8, time 258ms
worker 1 process job 5, time 258ms
cancel worker 1
worker 0 process job 11, time 190ms
cancel worker 0
worker 2 process job 10, time 226ms
worker 2 process job 12, time 485ms
cancel worker 2
*/
专题「golang相关」的其它文章 »
- 使用开发框架sponge快速把单体web服务拆分为微服务 (Sep 18, 2023)
- 使用开发框架sponge一天多开发完成一个简单版社区后端服务 (Jul 30, 2023)
- sponge —— 一个强大的go开发框架,以 (Jan 06, 2023)
- go test命令 (Apr 15, 2022)
- go应用程序性能分析 (Mar 29, 2022)
- channel原理和应用 (Mar 22, 2022)
- go runtime (Mar 14, 2022)
- go调试工具 (Mar 13, 2022)
- cobra (Mar 10, 2022)
- grpc使用实践 (Nov 27, 2020)
- 配置文件viper库 (Nov 22, 2020)
- 根据服务名称查看golang程序的profile信息 (Sep 03, 2019)
- go语言开发规范 (Aug 28, 2019)
- golang中的context包 (Aug 28, 2018)