rabbitMQ简介
rabbitMQ是一个广泛使用的开源消息队列系统,它实现了高级消息队列协议(AMQP)标准,为分布式应用程序提供了强大的消息传递功能。rabbitMQ是 Erlang 语言编写的,具有高度的可扩展性和可靠性,因此被广泛用于构建分布式、异步的消息通信系统。
消息队列的概念
消息队列是一种通信模式,用于在不同组件、服务或应用程序之间传递消息。它允许发送者将消息放入队列,而接收者可以从队列中获取消息,实现了解耦、异步通信和数据传递的目标。消息队列通常用于处理以下情况:
- 异步通信:发送方和接收方之间不需要立即响应,提高了系统的可伸缩性和性能。
- 任务排队:将需要处理的任务放入队列,由工作进程异步执行。
- 解耦组件:允许不同的应用程序或服务之间进行松耦合的通信。
rabbitMQ 工作模式
- Direct Exchange(直连交换机):对于每个队列与direct交换机绑定的key进行完全匹配。
- Topic Exchange(主题交换机) :对于每个队列与Topic交换机绑定的key进行模糊匹配。
- Fanout Exchange(扇出型交换机): Fanout类型的交换机会将消息分发给所有绑定了此交换机的队列。
- Headers Exchange(头交换机):Headers类型的交换机是通过headers信息来匹配的,工作原理与direct类型类似。
- Delayed Message Exchange(延时交换机):指定一个消息不是立即投递到队列,而是在指定的一段时间后才投递。
rabbitMQ 的核心概念
- Producer(生产者):负责向消息队列发送消息的应用程序或服务。
- Consumer(消费者):负责从消息队列接收和处理消息的应用程序或服务。
- Queue(队列):用于存储消息的缓冲区,消费者从队列中获取消息进行处理。
- Exchange(交换机):接收生产者发送的消息并将其路由到一个或多个队列。
- Binding(绑定):定义了队列和交换机之间的关系,指定了如何将消息从交换机路由到队列。
- Virtual Host(虚拟主机):rabbitMQ允许将多个逻辑消息队列隔离到不同的虚拟主机中,以实现资源隔离和多租户支持。
工作流程
rabbitMQ的工作流程如下:
- 生产者将消息发布到一个或多个交换机。
- 交换机根据绑定规则将消息路由到一个或多个队列。
- 消费者订阅队列并接收消息。
- 消费者处理消息,并可以确认消息已被成功处理。
- 消息可以持久化到磁盘,以确保在 rabbitMQ重启后不会丢失。
消息确认和持久化
rabbitMQ具有高度的可靠性,它支持消息确认机制,确保消息在成功处理后才从队列中删除。如果消费者在处理消息时发生错误,消息将被重新排队,而不会丢失。此外,rabbitMQ还支持将消息持久化到磁盘,以防止消息在系统故障时丢失。
可用性和扩展性
rabbitMQ具有高可用性和可伸缩性的特性。它支持镜像队列(Queue Mirroring)来确保队列数据的冗余备份,以提高可用性。此外,rabbitMQ集群可以水平扩展,允许将多个节点添加到集群中以增加处理能力。
协议支持
rabbitMQ支持多种协议,包括 AMQP(高级消息队列协议)、STOMP、MQTT 等。这使得不同类型的应用程序可以与 rabbitMQ进行通信,而无需修改现有代码。
应用场景
- 分布式系统通信:用于不同组件或服务之间的消息传递。
- 异步任务处理:将需要执行的任务放入队列,由工作者进行处理。
- 日志和监控数据的收集:将日志和监控数据发送到 rabbitMQ,以进行集中处理和分析。
安装rabbitMQ
在docker安装单机版rabbitMQ
docker-compose.yaml配置文件内容如下:
version: '3'
services:
rabbitmq:
image: rabbitmq:3.12-management
container_name: rabbitmq
hostname: rabbitmq-service
restart: always
ports:
- 5672:5672
- 15672:15672
volumes:
- $PWD/data:/var/lib/rabbitmq
- $PWD/plugins/enabled_plugins:/etc/rabbitmq/enabled_plugins
- $PWD/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez:/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez
environment:
TZ: Asia/Shanghai
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
RABBITMQ_DEFAULT_VHOST: /
- enabled_plugins 是设置默认开启的插件,内容为
[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus]
- rabbitmq_delayed_message_exchange-3.12.0.ez 是延时队列插件。
启动rabbitmq:
docker-compose up -d
可以在浏览器访问管理后台 http://localhost:15672,户名和密码都是guest。
在docker安装高可用的rabbitMQ集群
安装根据实际需要使用普通模式和镜像模式,一共有三个rabbitmq节点和一个高可用代理服务haproxy,haproxy务作为代理连接入口,文件列表如下:
.
├── cluster-entrypoint.sh
├── docker-compose.yml
├── .env
└── haproxy.cfg
(1) 加入rabbitMQ集群的脚本文件cluster-entrypoint.sh内容如下:
#!/bin/bash
set -e
# Start RMQ from entry point.
# This will ensure that environment variables passed
# will be honored
/usr/local/bin/docker-entrypoint.sh rabbitmq-server -detached
# Do the cluster dance
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq1
# Stop the entire RMQ server. This is done so that we
# can attach to it again, but without the -detached flag
# making it run in the forground
rabbitmqctl stop
# Wait a while for the app to really stop
sleep 2s
# Start it
rabbitmq-server
(2) docker-compose配置文件的环境变量.env内容如下,.env文件包含可用于更改默认用户名,密码等。
RABBITMQ_DEFAULT_USER=guest
RABBITMQ_DEFAULT_PASS=guest
RABBITMQ_DEFAULT_VHOST=/
RABBITMQ_ERLANG_COOKIE=rec123456
(3) 高可用服务的配置文件haproxy.cfg内容如下:
global
log 127.0.0.1 local1
maxconn 4096
defaults
log global
mode tcp
option tcplog
retries 3
option redispatch
maxconn 2000
timeout connect 5000
timeout client 50000
timeout server 50000
listen stats
bind *:1936
mode http
stats enable
stats hide-version
stats realm Haproxy\ Statistics
stats uri /
listen rabbitmq
bind *:5672
mode tcp
balance roundrobin
timeout client 3h
timeout server 3h
option clitcpka
server rabbitmq1 rabbitmq1:5672 check inter 5s rise 2 fall 3
server rabbitmq2 rabbitmq2:5672 check inter 5s rise 2 fall 3
server rabbitmq3 rabbitmq3:5672 check inter 5s rise 2 fall 3
listen mgmt
bind *:15672
mode tcp
balance roundrobin
timeout client 3h
timeout server 3h
option clitcpka
server rabbitmq1 rabbitmq1:15672 check inter 5s rise 2 fall 3
server rabbitmq2 rabbitmq2:15672 check inter 5s rise 2 fall 3
server rabbitmq3 rabbitmq3:15672 check inter 5s rise 2 fall 3
(4) docker-compose.yml配置文件内容如下:
version: '3'
services:
rabbitmq1:
hostname: rabbitmq1
image: rabbitmq:3.12-management
restart: always
environment:
- RABBITMQ_ERLANG_COOKIE=${RABBITMQ_ERLANG_COOKIE}
- RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER}
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS}
- RABBITMQ_DEFAULT_VHOST=${RABBITMQ_DEFAULT_VHOST}
volumes:
- ./data/rabbitmq1:/var/lib/rabbitmq/mnesia
rabbitmq2:
hostname: rabbitmq2
image: rabbitmq:3.12-management
restart: always
depends_on:
- rabbitmq1
environment:
- RABBITMQ_ERLANG_COOKIE=${RABBITMQ_ERLANG_COOKIE}
volumes:
- ./cluster-entrypoint.sh:/usr/local/bin/cluster-entrypoint.sh
- ./data/rabbitmq2:/var/lib/rabbitmq/mnesia
entrypoint: sh /usr/local/bin/cluster-entrypoint.sh
rabbitmq3:
hostname: rabbitmq3
image: rabbitmq:3.12-management
restart: always
depends_on:
- rabbitmq1
environment:
- RABBITMQ_ERLANG_COOKIE=${RABBITMQ_ERLANG_COOKIE}
volumes:
- ./cluster-entrypoint.sh:/usr/local/bin/cluster-entrypoint.sh
- ./data/rabbitmq3:/var/lib/rabbitmq/mnesia
entrypoint: sh /usr/local/bin/cluster-entrypoint.sh
haproxy:
image: haproxy:1.9
restart: always
volumes:
- ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
depends_on:
- rabbitmq1
- rabbitmq2
- rabbitmq3
ports:
- 15672:15672
- 5672:5672
启动rabbitMQ集群:
docker-compose up -d
# 默认用户名guest,默认密码guest
# 代理连接地址:localhost:5672
# 管理界面地址:localhost:15672
可以在浏览器访问管理后台 http://localhost:15672,户名和密码都是guest。
在k8s安装rabbitMQ集群
根据实际需要,安装rabbitmq集群可以选择普通模式和镜像模式,如果需要设置为镜像模式,在普通集群的中任意节点启用策略,策略会自动同步到集群节点:
rabbitmqctl set_policy ha-all “^ha.” ‘{“ha-mode”:“all”}’
注意:”^ha” 这个规则要根据自己修改,这个是指同步”ha”开头的队列名称,配置时使用的应用于所有队列,所以表达式为”^“。
创建rabbitmq集群的Erlang cookie,配置文件rabbitmq-secret.yml内容如下:
apiVersion: v1
kind: Secret
metadata:
name: rabbitmq-config
namespace: default
data:
erlang-cookie: |-
MTIzNDU2Nzg5MAo=
使用statefulset启动rabbitmq,配置文件rabbitmq-sts.yml内容如下:
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
name: rabbitmq
spec:
serviceName: rabbitmq
replicas: 3
template:
metadata:
labels:
app: rabbitmq
spec:
containers:
- name: rabbitmq
image: rabbitmq:3.12-management
lifecycle:
postStart:
exec:
command:
- /bin/sh
- -c
- >
if [ -z "$(grep rabbitmq /etc/resolv.conf)" ]; then
sed "s/^search \([^ ]\+\)/search rabbitmq.\1 \1/" /etc/resolv.conf > /etc/resolv.conf.new;
cat /etc/resolv.conf.new > /etc/resolv.conf;
rm /etc/resolv.conf.new;
fi;
until rabbitmqctl node_health_check; do sleep 1; done;
if [[ "$HOSTNAME" != "rabbitmq-0" && -z "$(rabbitmqctl cluster_status | grep rabbitmq-0)" ]]; then
rabbitmqctl stop_app;
rabbitmqctl join_cluster rabbit@rabbitmq-0;
rabbitmqctl start_app;
fi;
rabbitmqctl set_policy ha-all "." '{"ha-mode":"exactly","ha-params":3,"ha-sync-mode":"automatic"}'
env:
- name: RABBITMQ_ERLANG_COOKIE
valueFrom:
secretKeyRef:
name: rabbitmq-config
key: erlang-cookie
ports:
- containerPort: 5672
name: amqp
volumeMounts:
- name: rabbitmq
mountPath: /var/lib/rabbitmq
volumeClaimTemplates:
- metadata:
name: rabbitmq
annotations:
volume.alpha.kubernetes.io/storage-class: anything
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 2Gi
注:必须准备好存储卷后,pod才能启动。
rabbitmq服务配置文件rabbitmq-svc.yml内容如下,对外开放http端口,其它端口在集群内部使用。
apiVersion: v1
kind: Service
metadata:
# Expose the management HTTP port on each node
name: rabbitmq-management
labels:
app: rabbitmq
spec:
selector:
app: rabbitmq
type: NodePort
ports:
- port: 15672
name: http
nodePort: 30072
---
apiVersion: v1
kind: Service
metadata:
# The required headless service for StatefulSets
name: rabbitmq
labels:
app: rabbitmq
spec:
selector:
app: rabbitmq
clusterIP: None
ports:
- port: 5672
name: amqp
- port: 4369
name: epmd
- port: 25672
name: rabbitmq-dist
启动rabbitmq集群
kubectl apply -f rabbitmq-secret.yml
kubectl apply -f rabbitmq-sts.yml
kubectl apply -f rabbitmq-svc.yml
# 默认用户名guest,默认密码guest
# 代理连接地址:rabbitmq.default:5672
# 管理界面地址:node-ip:30072
Direct类型消息队列的golang示例
生产端示例代码
package main
import (
"context"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
url = "amqp://guest:guest@192.168.3.37:5672/"
exchangeName = "direct-exchange-demo"
queueName = "direct-queue-demo"
)
func main() {
conn, err := amqp.Dial(url)
checkErr(err)
defer conn.Close()
ctx := context.Background()
routingKey1 := "info"
exchange1 := NewDirectExchange(exchangeName, routingKey1)
p1, err := NewProducer(queueName, conn, exchange1)
checkErr(err)
defer p1.Close()
routingKey2 := "warning"
exchange2 := NewDirectExchange(exchangeName, routingKey2)
p2, err := NewProducer(queueName, conn, exchange2)
checkErr(err)
defer p2.Close()
var body string
count := 0
for i := 1; i <= 5; i++ {
body = fmt.Sprintf("%s message %d", routingKey1, i)
err = p1.Publish(ctx, []byte(body)) // p1 发送消息
checkErr(err)
body = fmt.Sprintf("%s message %d", routingKey2, i)
err = p2.Publish(ctx, []byte(body)) // p2 发送消息
checkErr(err)
count += 2
}
fmt.Println("publish total", count)
}
// Exchange 交换机
type Exchange struct {
Name string // exchange名称
Type string // exchange类型
RoutingKey string // 路由key
}
// NewDirectExchange 实例化一个direct类型交换机
func NewDirectExchange(exchangeName string, routingKey string) *Exchange {
return &Exchange{
Name: exchangeName,
Type: "direct",
RoutingKey: routingKey,
}
}
// Producer 生产者对象
type Producer struct {
queueName string
exchange *Exchange
conn *amqp.Connection
ch *amqp.Channel
}
// NewProducer 实例化一个生产者
func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) {
// 创建管道
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// 声明交换机类型
err = ch.ExchangeDeclare(
exchange.Name, // 交换机名称
exchange.Type, // 交换机的类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否公开,false即公开
false, // 是否等待
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 声明队列,如果队列不存在则自动创建,存在则跳过创建
q, err := ch.QueueDeclare(
queueName, // 消息队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性(仅创建它的程序才可用)
false, // 是否阻塞处理
nil, // 额外的属性
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 绑定队列和交换机
err = ch.QueueBind(
q.Name, // 队列名称
exchange.RoutingKey, // 路由key
exchange.Name, // 交换机名称
false,
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
return &Producer{
queueName: queueName,
conn: conn,
ch: ch,
exchange: exchange,
}, nil
}
// Publish 发送消息
func (p *Producer) Publish(ctx context.Context, body []byte) error {
err := p.ch.PublishWithContext(
ctx,
p.exchange.Name, // exchange name
p.exchange.RoutingKey, // key
false, // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者
false, // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
//DeliveryMode: amqp.Persistent, // 如果队列的声明是持久化的,那么消息也设置为持久化
ContentType: "text/plain",
Body: body,
},
)
if err != nil {
return err
}
fmt.Println("[send]: " + string(body))
return nil
}
// Close 关闭生产者
func (p *Producer) Close() {
if p.ch != nil {
_ = p.ch.Close()
}
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
消费端示例代码
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync/atomic"
"syscall"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
url = "amqp://guest:guest@192.168.3.37:5672/"
exchangeName = "direct-exchange-demo"
queueName = "direct-queue-demo"
)
func main() {
conn, err := amqp.Dial(url)
checkErr(err)
defer conn.Close()
ctx := context.Background()
routingKey1 := "info"
exchange1 := NewDirectExchange(exchangeName, routingKey1)
c1, err := NewConsumer(ctx, queueName, exchange1, conn)
checkErr(err)
c1.Consume() // 消费消息
defer c1.Close()
routingKey2 := "warning"
exchange2 := NewDirectExchange(exchangeName, routingKey2)
c2, err := NewConsumer(ctx, queueName, exchange2, conn)
checkErr(err)
c2.Consume() // 消费消息
defer c2.Close()
fmt.Println("exit press CTRL+C")
exit := make(chan os.Signal, 1)
signal.Notify(exit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-exit
fmt.Println("exit consume messages, received total", counter)
}
// Exchange 交换机
type Exchange struct {
Name string // exchange名称
Type string // exchange类型,支持"direct"、"topic"、"fanout"、"headers"
RoutingKey string // 路由key
}
// NewDirectExchange 实例化一个direct类型交换机
func NewDirectExchange(exchangeName string, routingKey string) *Exchange {
return &Exchange{
Name: exchangeName,
Type: "direct",
RoutingKey: routingKey,
}
}
// Consumer 消费者
type Consumer struct {
ctx context.Context
queueName string
conn *amqp.Connection
ch *amqp.Channel
delivery <-chan amqp.Delivery
exchange *Exchange
}
// NewConsumer 实例化一个消费者
func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) {
// 创建管道
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// 声明交换机类型
err = ch.ExchangeDeclare(
exchange.Name, // 交换机名称
exchange.Type, // 交换机的类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否公开,false即公开
false, // 是否等待
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 声明队列,如果队列不存在则自动创建,存在则跳过创建
q, err := ch.QueueDeclare(
queueName, // 消息队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性(仅创建它的程序才可用)
false, // 是否阻塞处理
nil, // 额外的属性
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 绑定队列和交换机
err = ch.QueueBind(
q.Name,
exchange.RoutingKey,
exchange.Name,
false,
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
delivery, err := ch.ConsumeWithContext(
ctx,
queueName, // queue 名称
"", // consumer 用来区分多个消费者
true, // auto-ack 是否自动应答
false, // exclusive 是否独有
false, // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
false, // no-wait 是否阻塞
nil, // args
)
if err != nil {
_ = ch.Close()
return nil, err
}
return &Consumer{
ctx: ctx,
queueName: queueName,
conn: conn,
ch: ch,
delivery: delivery,
exchange: exchange,
}, nil
}
var counter int32 = 0
// Consume 接收消息
func (c *Consumer) Consume() {
go func() {
fmt.Printf("waiting for messages, type=%s, queue=%s, key=%s\n", c.exchange.Type, c.queueName, c.exchange.RoutingKey)
for d := range c.delivery {
// 处理消息
if d.RoutingKey == "info" {
fmt.Printf("[info received]: %s\n", d.Body)
} else if d.RoutingKey == "warning" {
fmt.Printf("[warning received]: %s\n", d.Body)
}
atomic.AddInt32(&counter, 1)
// _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack
}
}()
}
// Close 关闭
func (c *Consumer) Close() {
if c.ch != nil {
_ = c.ch.Close()
}
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
topic类型消息队列的golang示例
topic中,将routingkey通过”.“来分为多个部分,匹配规则
- “*“:代表一个部分,例如routingkey为key1.*或*.key2, topic=key1.key2都可以匹配
- ”#“:代表0个或多个部分,例如routingkey为key1.#或#.key3, topic=key1.key2.key3都可以匹配,注意:如果绑定的路由键为 “#” 时,则接受所有消息,因为路由键所有都匹配。
生产端示例代码
package main
import (
"context"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
url = "amqp://guest:guest@192.168.3.37:5672/"
exchangeName = "topic-exchange-demo"
)
func main() {
conn, err := amqp.Dial(url)
checkErr(err)
defer conn.Close()
ctx := context.Background()
queueName := "topic-queue-1"
exchange := NewTopicExchange(exchangeName, "*.orange.*")
p, err := NewProducer(queueName, conn, exchange)
checkErr(err)
key := "key1.orange.key3"
err = p.Publish(ctx, key, []byte(key+" say hello"))
checkErr(err)
defer p.Close()
queueName = "topic-queue-2"
exchange = NewTopicExchange(exchangeName, "*.*.rabbit")
p, err = NewProducer(queueName, conn, exchange)
checkErr(err)
defer p.Close()
key = "key1.key2.rabbit"
err = p.Publish(ctx, key, []byte(key+" say hello"))
checkErr(err)
exchange = NewTopicExchange(exchangeName, "lazy.#")
p, err = NewProducer(queueName, conn, exchange)
checkErr(err)
defer p.Close()
key = "lazy.key2.key3"
err = p.Publish(ctx, key, []byte(key+" say hello"))
checkErr(err)
}
// Exchange 交换机
type Exchange struct {
Name string // exchange名称
Type string // exchange类型,支持"direct"、"topic"、"fanout"、"headers"
RoutingKey string // 路由key
}
// NewTopicExchange 实例化一个topic类型交换机
func NewTopicExchange(exchangeName string, routingKey string) *Exchange {
return &Exchange{
Name: exchangeName,
Type: "topic",
RoutingKey: routingKey,
}
}
// Producer 生产者对象
type Producer struct {
queueName string
exchange *Exchange
conn *amqp.Connection
ch *amqp.Channel
}
// NewProducer 实例化一个生产者
func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) {
// 创建管道
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// 声明交换机类型
err = ch.ExchangeDeclare(
exchange.Name, // 交换机名称
exchange.Type, // 交换机的类型,支持direct、topic、fanout、headers
true, // 是否持久化
false, // 是否自动删除
false, // 是否公开,false即公开
false, // 是否等待
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 声明队列,如果队列不存在则自动创建,存在则跳过创建
q, err := ch.QueueDeclare(
queueName, // 消息队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性(仅创建它的程序才可用)
false, // 是否阻塞处理
nil, // 额外的属性
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 绑定队列和交换机
err = ch.QueueBind(
q.Name,
exchange.RoutingKey,
exchange.Name,
false,
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
return &Producer{
queueName: queueName,
conn: conn,
ch: ch,
exchange: exchange,
}, nil
}
// Publish 发送消息
func (p *Producer) Publish(ctx context.Context, routingKey string, body []byte) error {
err := p.ch.PublishWithContext(
ctx,
p.exchange.Name, // exchange name
routingKey, // key
false, // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者
false, // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: body,
},
)
if err != nil {
return err
}
fmt.Printf("[send]: %s\n", body)
return nil
}
// Close 关闭
func (p *Producer) Close() {
if p.ch != nil {
_ = p.ch.Close()
}
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
消费端示例代码
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
url = "amqp://guest:guest@192.168.3.37:5672/"
exchangeName = "topic-exchange-demo"
)
func main() {
conn, err := amqp.Dial(url)
checkErr(err)
defer conn.Close()
ctx := context.Background()
var queueName string // 对应的发送端的队列名称,如果发送的key模糊匹配命中,可以接收到消息
queueName = "topic-queue-1"
exchange := NewTopicExchange(exchangeName, "*.orange.*")
queue, err := NewConsumer(ctx, queueName, exchange, conn)
checkErr(err)
queue.Consume()
defer queue.Close()
queueName = "topic-queue-2"
exchange = NewTopicExchange(exchangeName, "*.*.rabbit")
queue, err = NewConsumer(ctx, queueName, exchange, conn)
checkErr(err)
defer queue.Close()
queue.Consume()
exchange = NewTopicExchange(exchangeName, "lazy.#")
queue, err = NewConsumer(ctx, queueName, exchange, conn)
checkErr(err)
defer queue.Close()
queue.Consume()
fmt.Println("exit press CTRL+C")
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-interrupt
fmt.Println("exit consume messages")
}
// Exchange 交换机
type Exchange struct {
Name string // exchange名称
Type string // exchange类型,支持"direct"、"topic"、"fanout"、"headers"
RoutingKey string // 路由key
}
// NewTopicExchange 实例化一个topic类型交换机
func NewTopicExchange(exchangeName string, routingKey string) *Exchange {
return &Exchange{
Name: exchangeName,
Type: "topic",
RoutingKey: routingKey,
}
}
// Consumer 消费者
type Consumer struct {
ctx context.Context
queueName string
conn *amqp.Connection
ch *amqp.Channel
delivery <-chan amqp.Delivery
exchange *Exchange
}
// NewConsumer 实例化一个消费者
func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) {
// 创建管道
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// 声明交换机类型
err = ch.ExchangeDeclare(
exchange.Name, // 交换机名称
exchange.Type, // 交换机的类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否公开,false即公开
false, // 是否等待
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 声明队列,如果队列不存在则自动创建,存在则跳过创建
q, err := ch.QueueDeclare(
queueName, // 消息队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性(仅创建它的程序才可用)
false, // 是否阻塞处理
nil, // 额外的属性
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 绑定队列和交换机
err = ch.QueueBind(
q.Name,
exchange.RoutingKey,
exchange.Name,
false,
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 为消息队列注册消费者
delivery, err := ch.ConsumeWithContext(
ctx,
queueName, // queue 名称
"", // consumer 用来区分多个消费者
true, // auto-ack 是否自动应答
false, // exclusive 是否独有
false, // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
false, // no-wait 是否阻塞
nil, // args
)
if err != nil {
_ = ch.Close()
return nil, err
}
return &Consumer{
ctx: ctx,
queueName: queueName,
conn: conn,
ch: ch,
delivery: delivery,
exchange: exchange,
}, nil
}
// Consume 接收消息
func (c *Consumer) Consume() {
go func() {
fmt.Printf("waiting for messages, type=%s, queue=%s, key=%s\n", c.exchange.Type, c.queueName, c.exchange.RoutingKey)
for d := range c.delivery {
// 处理消息
fmt.Printf("[%s received]: %s\n", c.queueName, d.Body)
// _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack
}
}()
}
// Close 关闭
func (c *Consumer) Close() {
if c.ch != nil {
_ = c.ch.Close()
}
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
fanout类型消息队列的golang示例
fanout类型的交换机会将消息分发给所有绑定了此交换机的队列,此时routingkey参数相当于无效。可以使用fanout来实现发布订阅。
生产端示例代码
package main
import (
"context"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
url = "amqp://guest:guest@192.168.3.37:5672/"
exchangeName = "fanout-exchange-demo"
)
func main() {
conn, err := amqp.Dial(url)
checkErr(err)
defer conn.Close()
ctx := context.Background()
queueName := "fanout-queue-1"
exchange := NewFanOutExchange(exchangeName)
p, err := NewProducer(queueName, conn, exchange)
checkErr(err)
defer p.Close()
err = p.Publish(ctx, []byte(queueName+" say hello"))
checkErr(err)
queueName = "fanout-queue-2"
exchange = NewFanOutExchange(exchangeName)
p, err = NewProducer(queueName, conn, exchange)
checkErr(err)
defer p.Close()
err = p.Publish(ctx, []byte(queueName+" say hello"))
checkErr(err)
queueName = "fanout-queue-3"
exchange = NewFanOutExchange(exchangeName)
p, err = NewProducer(queueName, conn, exchange)
checkErr(err)
defer p.Close()
err = p.Publish(ctx, []byte(queueName+" say hello"))
checkErr(err)
}
// Exchange 交换机
type Exchange struct {
Name string // exchange名称
Type string // exchange类型,支持direct、topic、fanout、headers
RoutingKey string // 路由key
}
// NewFanOutExchange 实例化一个fanout类型交换机
func NewFanOutExchange(exchangeName string) *Exchange {
return &Exchange{
Name: exchangeName,
Type: "fanout",
RoutingKey: "",
}
}
// Producer 生产者对象
type Producer struct {
queueName string
exchange *Exchange
conn *amqp.Connection
ch *amqp.Channel
}
// NewProducer 实例化一个生产者
func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) {
// 创建管道
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// 声明交换机类型
err = ch.ExchangeDeclare(
exchange.Name, // 交换机名称
exchange.Type, // 交换机的类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否公开,false即公开
false, // 是否等待
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 声明队列,如果队列不存在则自动创建,存在则跳过创建
q, err := ch.QueueDeclare(
queueName, // 消息队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性(仅创建它的程序才可用)
false, // 是否阻塞处理
nil, // 额外的属性
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 绑定队列和交换机
err = ch.QueueBind(
q.Name,
exchange.RoutingKey, // 如果交换机类型为fanout,此参数无效
exchange.Name,
false,
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
return &Producer{
queueName: queueName,
conn: conn,
ch: ch,
exchange: exchange,
}, nil
}
// Publish 发送消息
func (p *Producer) Publish(ctx context.Context, body []byte) error {
err := p.ch.PublishWithContext(
ctx,
p.exchange.Name, // exchange name
"", // key 如果类型为fanout,此参数无效
false, // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者
false, // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: body,
},
)
if err != nil {
return err
}
fmt.Printf("[send]: %s\n", body)
return nil
}
// Close 关闭生产者
func (p *Producer) Close() {
if p.ch != nil {
_ = p.ch.Close()
}
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
消费端示例代码
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
url = "amqp://guest:guest@192.168.3.37:5672/"
exchangeName = "fanout-exchange-demo"
)
func main() {
conn, err := amqp.Dial(url)
checkErr(err)
defer conn.Close()
ctx := context.Background()
var queueName string // 对应的发送端的队列名称,如果发送的key模糊匹配命中,可以接收到消息
queueName = "fanout-queue"
exchange := NewFanOutExchange(exchangeName)
queue, err := NewConsumer(ctx, queueName, exchange, conn)
checkErr(err)
queue.Consume() // 消费消息
defer queue.Close()
fmt.Println("exit press CTRL+C")
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-interrupt
fmt.Println("exit consume messages")
}
// Exchange 交换机
type Exchange struct {
Name string // exchange名称
Type string // exchange类型,支持"direct"、"topic"、"fanout"、"headers"
RoutingKey string // 路由key
}
// NewFanOutExchange 实例化一个fanout类型交换机
func NewFanOutExchange(exchangeName string) *Exchange {
return &Exchange{
Name: exchangeName,
Type: "fanout",
RoutingKey: "",
}
}
// Consumer 消费者
type Consumer struct {
ctx context.Context
queueName string
conn *amqp.Connection
ch *amqp.Channel
delivery <-chan amqp.Delivery
exchange *Exchange
}
// NewConsumer 实例化一个消费者
func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) {
// 创建管道
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// 声明交换机类型
err = ch.ExchangeDeclare(
exchange.Name, // 交换机名称
exchange.Type, // 交换机的类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否公开,false即公开
false, // 是否等待
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 声明队列,如果队列不存在则自动创建,存在则跳过创建
q, err := ch.QueueDeclare(
queueName, // 消息队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性(仅创建它的程序才可用)
false, // 是否阻塞处理
nil, // 额外的属性
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 绑定队列和交换机
err = ch.QueueBind(
q.Name, // 队列名称
exchange.RoutingKey, // 如果是fanout类型,无效
exchange.Name, // 交换机名称
false,
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 为消息队列注册消费者
delivery, err := ch.ConsumeWithContext(
ctx,
queueName, // queue 名称
"", // consumer 用来区分多个消费者
true, // auto-ack 是否自动应答
false, // exclusive 是否独有
false, // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
false, // no-wait 是否阻塞
nil, // args
)
if err != nil {
_ = ch.Close()
return nil, err
}
return &Consumer{
ctx: ctx,
queueName: queueName,
conn: conn,
ch: ch,
delivery: delivery,
exchange: exchange,
}, nil
}
// Consume 接收消息
func (c *Consumer) Consume() {
go func() {
fmt.Printf("waiting for messages, queue = %s\n", c.queueName)
for d := range c.delivery {
// 处理消息
fmt.Printf("[%s received]: %s\n", c.queueName, d.Body)
// _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack
}
}()
}
// Close 关闭
func (c *Consumer) Close() {
if c.ch != nil {
_ = c.ch.Close()
}
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
header类型消息队列的golang示例
headers匹配AMQP消息的header而不是路由键,此时routingkey参数相当于无效,headers交换机和direct交换机类似。
消费方指定的headers中必须指定一个”x-match”的键,键”x-match”的值只有2个
- x-match=all:表示所有的键值对都匹配才能接收到消息
- x-match=any:表示只要键值对匹配就能接收消息
生产端示例代码
package main
import (
"context"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
url = "amqp://guest:guest@192.168.3.37:5672/"
exchangeName = "headers-exchange-demo"
)
func main() {
conn, err := amqp.Dial(url)
checkErr(err)
defer conn.Close()
ctx := context.Background()
// 完全匹配headers,消费端才能收到消息
queueName1 := "headers-queue-1"
kv1 := map[string]interface{}{"hello1": "world1", "foo1": "bar1"}
exchange1 := NewHeaderExchange(exchangeName, "all", kv1)
p1, err := NewProducer(queueName1, conn, exchange1)
checkErr(err)
defer p1.Close()
headersKey1 := kv1 // 完全匹配,消费端队列可以收到消息
err = p1.Publish(ctx, headersKey1, []byte(p1.queueName+" say hello 1"))
checkErr(err)
headersKey1 = map[string]interface{}{"k": "v"} // 完全不匹配,消费端队列不能收到消息
err = p1.Publish(ctx, headersKey1, []byte(p1.queueName+" say hello 2"))
checkErr(err)
headersKey1 = map[string]interface{}{"foo1": "bar1"} // 部分匹配,消费端队列不能收到消息
err = p1.Publish(ctx, headersKey1, []byte(p1.queueName+" say hello 3"))
checkErr(err)
// 部分匹配headers,消费端能收到消息
queueName2 := "headers-queue-2"
kv2 := map[string]interface{}{"hello2": "world2", "foo2": "bar2"}
exchange2 := NewHeaderExchange(exchangeName, "any", kv2)
p2, err := NewProducer(queueName2, conn, exchange2)
checkErr(err)
defer p2.Close()
headersKey2 := kv2 // 完全匹配,消费端队列可以收到消息
err = p2.Publish(ctx, headersKey2, []byte(p2.queueName+" say hello 4"))
checkErr(err)
headersKey2 = map[string]interface{}{"k": "v"} // 完全不匹配,消费端队列不能收到消息
err = p2.Publish(ctx, headersKey2, []byte(p2.queueName+" say hello 5"))
checkErr(err)
headersKey2 = map[string]interface{}{"foo2": "bar2"} // 部分匹配,消费端队列可以收到消息
err = p2.Publish(ctx, headersKey2, []byte(p2.queueName+" say hello 6"))
checkErr(err)
}
// Exchange 交换机
type Exchange struct {
Name string // exchange名称
Type string // exchange类型,支持"direct"、"topic"、"fanout"、"headers"
RoutingKey string // 路由key,如果类型为fanout和headers,此字段无效,不需要设置
Headers map[string]interface{} // 如果类型为headers,此字段必填
}
// NewHeaderExchange 实例化一个header类型的交换机,headerType支持all和any
func NewHeaderExchange(exchangeName string, headerType string, kv map[string]interface{}) *Exchange {
if kv == nil {
kv = make(map[string]interface{})
}
switch headerType {
case "all", "any":
kv["x-match"] = headerType
default:
kv["x-match"] = "all"
}
return &Exchange{
Name: exchangeName,
Type: "headers",
RoutingKey: "",
Headers: kv,
}
}
// Producer 生产者对象
type Producer struct {
queueName string
exchange *Exchange
conn *amqp.Connection
ch *amqp.Channel
}
// NewProducer 实例化一个生产者
func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) {
// 创建管道
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// 声明交换机类型
err = ch.ExchangeDeclare(
exchange.Name, // 交换机名称
exchange.Type, // 交换机的类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否公开,false即公开
false, // 是否等待
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 声明队列,如果队列不存在则自动创建,存在则跳过创建
q, err := ch.QueueDeclare(
queueName, // 消息队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性(仅创建它的程序才可用)
false, // 是否阻塞处理
nil, // 额外的属性
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 绑定队列和交换机
err = ch.QueueBind(
q.Name,
exchange.RoutingKey,
exchange.Name,
false,
exchange.Headers,
)
if err != nil {
_ = ch.Close()
return nil, err
}
return &Producer{
queueName: queueName,
conn: conn,
ch: ch,
exchange: exchange,
}, nil
}
// Publish 发送消息
func (p *Producer) Publish(ctx context.Context, headers map[string]interface{}, body []byte) error {
err := p.ch.PublishWithContext(
ctx,
p.exchange.Name, // exchange name
p.exchange.RoutingKey, // key
false, // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者
false, // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
Headers: headers,
ContentType: "text/plain",
Body: body,
},
)
if err != nil {
return err
}
fmt.Printf("[send]: %s\n", body)
return nil
}
// Close 关闭生产者
func (p *Producer) Close() {
if p.ch != nil {
_ = p.ch.Close()
}
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
消费端示例代码
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
url = "amqp://guest:guest@192.168.3.37:5672/"
exchangeName = "headers-exchange-demo"
)
func main() {
conn, err := amqp.Dial(url)
checkErr(err)
defer conn.Close()
ctx := context.Background()
queueName1 := "headers-queue-1"
kv1 := map[string]interface{}{"hello1": "world1", "foo1": "bar1"}
exchange1 := NewHeadersExchange(exchangeName, "all", kv1)
c1, err := NewConsumer(ctx, queueName1, exchange1, conn)
checkErr(err)
c1.Consume()
defer c1.Close()
queueName2 := "headers-queue-2"
kv2 := map[string]interface{}{"hello2": "world2", "foo2": "bar2"}
exchange2 := NewHeadersExchange(exchangeName, "any", kv2)
c2, err := NewConsumer(ctx, queueName2, exchange2, conn)
checkErr(err)
c2.Consume()
defer c2.Close()
fmt.Println("exit press CTRL+C")
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-interrupt
fmt.Println("exit consume messages")
}
// Exchange 交换机
type Exchange struct {
Name string // exchange名称
Type string // exchange类型,支持"direct"、"topic"、"fanout"、"headers"
RoutingKey string // 路由key,如果类型为fanout和headers,此字段无效,不需要设置
Headers map[string]interface{} // 如果类型为headers,此字段必填
}
// NewHeadersExchange 创建一个header类型的交换机,headerType支持all和any
func NewHeadersExchange(exchangeName string, headerType string, kv map[string]interface{}) *Exchange {
if kv == nil {
kv = make(map[string]interface{})
}
switch headerType {
case "all", "any":
kv["x-match"] = headerType
default:
kv["x-match"] = "all"
}
return &Exchange{
Name: exchangeName,
Type: "headers",
RoutingKey: "",
Headers: kv,
}
}
// Consumer 消费者
type Consumer struct {
ctx context.Context
queueName string
conn *amqp.Connection
ch *amqp.Channel
delivery <-chan amqp.Delivery
exchange *Exchange
}
// NewConsumer 实例化一个消费者
func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) {
// 创建管道
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// 声明交换机类型
err = ch.ExchangeDeclare(
exchange.Name, // 交换机名称
exchange.Type, // 交换机的类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否公开,false即公开
false, // 是否等待
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 声明队列,如果队列不存在则自动创建,存在则跳过创建
q, err := ch.QueueDeclare(
queueName, // 消息队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性(仅创建它的程序才可用)
false, // 是否阻塞处理
nil, // 额外的属性
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 绑定队列和交换机
err = ch.QueueBind(
q.Name,
exchange.RoutingKey,
exchange.Name,
false,
exchange.Headers,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 为消息队列注册消费者
delivery, err := ch.ConsumeWithContext(
ctx,
queueName, // queue 名称
"", // consumer 用来区分多个消费者
true, // auto-ack 是否自动应答
false, // exclusive 是否独有
false, // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
false, // no-wait 是否阻塞
nil, // args
)
if err != nil {
_ = ch.Close()
return nil, err
}
return &Consumer{
ctx: ctx,
queueName: queueName,
conn: conn,
ch: ch,
delivery: delivery,
exchange: exchange,
}, nil
}
// Consume 接收消息
func (c *Consumer) Consume() {
go func() {
fmt.Printf("waiting for messages, queue = %s\n", c.queueName)
for d := range c.delivery {
// 处理消息
fmt.Printf("[%s received]: %s\n", c.queueName, d.Body)
// _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack
}
}()
}
// Close 关闭
func (c *Consumer) Close() {
if c.ch != nil {
_ = c.ch.Close()
}
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
延时类型消息队列的golang示例代码
rabbitMQ默认不支持延时消息队列类型,需要另外安装插件来实现,使用延时队列需要指定具体一种消息类型(direct、topic、fanout、headers),下面以direct类型的延时消息队列为例。
生产端示例代码
package main
import (
"context"
"fmt"
"strconv"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
url = "amqp://guest:guest@192.168.3.37:5672/"
exchangeName = "delayed-message-exchange-demo"
)
func main() {
conn, err := amqp.Dial(url)
checkErr(err)
defer conn.Close()
ctx := context.Background()
queueName := "delayed-message-queue"
routingKey := "delayed-key"
delayedMessageType := "direct"
exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey)
q, err := NewProducer(queueName, conn, exchange)
checkErr(err)
defer q.Close()
for i := 1; i <= 5; i++ {
body := time.Now().Format("2006-01-02 15:04:05.000") + " hello world " + strconv.Itoa(i)
err = q.Publish(ctx, time.Second*5, []byte(body)) // 发送消息
checkErr(err)
time.Sleep(time.Second)
}
}
// Exchange 交换机
type Exchange struct {
Name string // exchange名称
Type string // exchange类型,支持direct、topic、fanout、headers、x-delayed-message
RoutingKey string // 路由key
XDelayedMessageType string // 延时消息类型,支持direct、topic、fanout、headers
}
// NewDelayedMessageExchange 实例化一个delayed-message类型交换机,参数delayedMessageType 消息类型direct、topic、fanout、headers
func NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange {
return &Exchange{
Name: exchangeName,
Type: "x-delayed-message",
RoutingKey: routingKey,
XDelayedMessageType: delayedMessageType,
}
}
// Producer 生产者对象
type Producer struct {
queueName string
exchange *Exchange
conn *amqp.Connection
ch *amqp.Channel
}
// NewProducer 实例化一个生产者
func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) {
// 创建管道
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// 声明交换机类型
err = ch.ExchangeDeclare(
exchange.Name, // 交换机名称
exchange.Type, // x-delayed-message
true, // 是否持久化
false, // 是否自动删除
false, // 是否公开,false即公开
false, // 是否等待
amqp.Table{
"x-delayed-type": exchange.XDelayedMessageType, // 延时消息的类型direct、topic、fanout、headers
},
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 声明队列,如果队列不存在则自动创建,存在则跳过创建
q, err := ch.QueueDeclare(
queueName, // 消息队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性(仅创建它的程序才可用)
false, // 是否阻塞处理
nil, // 额外的属性
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 绑定队列和交换机
err = ch.QueueBind(
q.Name,
exchange.RoutingKey,
exchange.Name,
false,
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
return &Producer{
queueName: queueName,
conn: conn,
ch: ch,
exchange: exchange,
}, nil
}
// Publish 发送消息
func (p *Producer) Publish(ctx context.Context, delayTime time.Duration, body []byte) error {
err := p.ch.PublishWithContext(
ctx,
p.exchange.Name, // exchange name
p.exchange.RoutingKey, // key
false, // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者
false, // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 如果队列的声明是持久化的,那么消息也设置为持久化
ContentType: "text/plain",
Body: body,
Headers: amqp.Table{
"x-delay": int(delayTime / time.Millisecond), // 延迟时间: 毫秒
},
},
)
if err != nil {
return err
}
fmt.Printf("[send]: %s\n", body)
return nil
}
// Close 关闭生产者
func (p *Producer) Close() {
if p.ch != nil {
_ = p.ch.Close()
}
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
消费端示例代码
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
url = "amqp://guest:guest@192.168.3.37:5672/"
exchangeName = "delayed-message-exchange-demo"
)
func main() {
conn, err := amqp.Dial(url)
checkErr(err)
defer conn.Close()
ctx := context.Background()
queueName := "delayed-message-queue"
routingKey := "delayed-key"
delayedMessageType := "direct"
exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey)
c, err := NewConsumer(ctx, queueName, exchange, conn)
checkErr(err)
c.Consume() // 消费消息
defer c.Close()
fmt.Println("exit press CTRL+C")
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-interrupt
fmt.Println("exit consume messages")
}
// Exchange 交换机
type Exchange struct {
Name string // exchange名称
Type string // exchange类型,支持direct、topic、fanout、headers、x-delayed-message
RoutingKey string // 路由key
XDelayedMessageType string // 延时消息类型,支持direct、topic、fanout、headers
}
// NewDelayedMessageExchange 实例化一个delayed-message类型交换机,参数delayedMessageType 消息类型direct、topic、fanout、headers
func NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange {
return &Exchange{
Name: exchangeName,
Type: "x-delayed-message",
RoutingKey: routingKey,
XDelayedMessageType: delayedMessageType,
}
}
// Consumer 消费者
type Consumer struct {
ctx context.Context
queueName string
conn *amqp.Connection
ch *amqp.Channel
delivery <-chan amqp.Delivery
exchange *Exchange
}
// NewConsumer 实例化一个消费者
func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) {
// 创建管道
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// 声明交换机类型
err = ch.ExchangeDeclare(
exchange.Name, // 交换机名称
exchange.Type, // 交换机的类型,支持direct、topic、fanout、headers
true, // 是否持久化
false, // 是否自动删除
false, // 是否公开,false即公开
false, // 是否等待
amqp.Table{
"x-delayed-type": exchange.XDelayedMessageType, // 延时消息的类型direct、topic、fanout、headers
},
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 声明队列,如果队列不存在则自动创建,存在则跳过创建
q, err := ch.QueueDeclare(
queueName, // 消息队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性(仅创建它的程序才可用)
false, // 是否阻塞处理
nil, // 额外的属性
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 绑定队列和交换机
err = ch.QueueBind(
q.Name,
exchange.RoutingKey,
exchange.Name,
false,
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 为消息队列注册消费者
delivery, err := ch.ConsumeWithContext(
ctx,
queueName, // queue 名称
"", // consumer 用来区分多个消费者
true, // auto-ack 是否自动应答
false, // exclusive 是否独有
false, // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
false, // no-wait 是否阻塞
nil, // args
)
if err != nil {
_ = ch.Close()
return nil, err
}
return &Consumer{
queueName: queueName,
conn: conn,
ch: ch,
delivery: delivery,
exchange: exchange,
}, nil
}
// Consume 接收消息
func (c *Consumer) Consume() {
go func() {
fmt.Printf("waiting for messages, type=%s, queue=%s, key=%s\n", c.exchange.Type, c.queueName, c.exchange.RoutingKey)
for d := range c.delivery {
// 处理消息
fmt.Printf("%s %s [received]: %s\n", time.Now().Format("2006-01-02 15:04:05.000"), c.exchange.RoutingKey, d.Body)
// _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack
}
}()
}
// Close 关闭
func (c *Consumer) Close() {
if c.ch != nil {
_ = c.ch.Close()
}
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
发布订阅的golang示例代码
发布订阅是在fanout消息类型基础上实现的。
发布端示例代码
package main
import (
"context"
"fmt"
"strconv"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
url = "amqp://guest:guest@192.168.3.37:5672/"
exchangeName = "pub-sub"
)
func main() {
// 连接rabbitmq
conn, err := amqp.Dial(url)
checkErr(err)
defer conn.Close()
// 实例化一个发布者
p, err := NewPublisher(exchangeName, conn)
checkErr(err)
defer p.Close()
ctx := context.Background()
// 发送消息
for i := 1; i <= 10; i++ {
err = p.Publish(ctx, []byte("hello world "+strconv.Itoa(i)))
checkErr(err)
}
}
// Publisher 发布者
type Publisher struct {
exchangeName string
conn *amqp.Connection
ch *amqp.Channel
}
// NewPublisher 实例化一个发布者
func NewPublisher(exchangeName string, conn *amqp.Connection) (*Publisher, error) {
// 创建管道
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// 声明交换机类型
err = ch.ExchangeDeclare(
exchangeName, // 交换机名称
"fanout", // 交换机的类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否公开,false即公开
false, // 是否等待
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
return &Publisher{
exchangeName: exchangeName,
conn: conn,
ch: ch,
}, nil
}
func (p *Publisher) Publish(ctx context.Context, body []byte) error {
err := p.ch.PublishWithContext(
ctx,
p.exchangeName, // exchange name
"", // 消息类型为fanout,此参数无效
false, // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者
false, // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: body,
},
)
if err != nil {
return err
}
fmt.Printf("[send]: %s\n", body)
return nil
}
// Close 关闭生产者
func (p *Publisher) Close() {
if p.ch != nil {
_ = p.ch.Close()
}
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
订阅端示例代码
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
url = "amqp://guest:guest@192.168.3.37:5672/"
exchangeName = "pub-sub"
)
func main() {
conn, err := amqp.Dial(url)
checkErr(err)
defer conn.Close()
ctx := context.Background()
queueName1 := "pub-sub-queue-1"
s1, err := NewSubscriber(ctx, exchangeName, queueName1, conn)
checkErr(err)
s1.Subscribe() // 消费信息
defer s1.Close()
queueName2 := "pub-sub-queue-2"
s2, err := NewSubscriber(ctx, exchangeName, queueName2, conn)
checkErr(err)
s2.Subscribe() // 消费信息
defer s2.Close()
fmt.Println("exit press CTRL+C")
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-interrupt
fmt.Println("finished receiving messages")
}
// Subscriber 订阅者
type Subscriber struct {
ctx context.Context
exchangeName string
queueName string
conn *amqp.Connection
ch *amqp.Channel
delivery <-chan amqp.Delivery
}
// NewSubscriber 实例化一个订阅者
func NewSubscriber(ctx context.Context, exchangeName string, queueName string, conn *amqp.Connection) (*Subscriber, error) {
// 创建管道
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// 声明交换机类型
err = ch.ExchangeDeclare(
exchangeName, // 交换机名称
"fanout", // 交换机的类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否公开,false即公开
false, // 是否等待
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 声明队列,如果队列不存在则自动创建,存在则跳过创建
q, err := ch.QueueDeclare(
queueName, // 消息队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性(仅创建它的程序才可用)
false, // 是否阻塞处理
nil, // 额外的属性
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 绑定队列和交换机
err = ch.QueueBind(
q.Name, // 队列名称
"", // 消息类型为fanout时无效
exchangeName, // 交换机名称
false,
nil,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// 为消息队列注册消费者
delivery, err := ch.ConsumeWithContext(
ctx,
queueName, // queue 名称
"", // consumer 用来区分多个消费者
true, // auto-ack 是否自动应答
false, // exclusive 是否独有
false, // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
false, // no-wait 是否阻塞
nil, // args
)
if err != nil {
_ = ch.Close()
return nil, err
}
return &Subscriber{
ctx: ctx,
exchangeName: exchangeName,
queueName: queueName,
conn: conn,
ch: ch,
delivery: delivery,
}, nil
}
// Subscribe 订阅
func (c *Subscriber) Subscribe() {
go func() {
fmt.Printf("waiting for messages, queue = %s\n", c.queueName)
for d := range c.delivery {
// 处理消息
fmt.Printf("[%s received]: %s\n", c.queueName, d.Body)
// _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack
}
}()
}
// Close 关闭
func (p *Subscriber) Close() {
if p.ch != nil {
_ = p.ch.Close()
}
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
死信队列队列
什么是死信队列?
“死信”(Dead Letter)是指消息在正常的消息队列中无法被成功消费或处理而被重新路由到另一个专门的队列中,这个专门的队列称为”死信队列”(Dead Letter Queue,DLQ)。死信队列用于存储那些未能被正常处理的消息,以便后续分析和处理。
以下是一些常见的导致消息变成死信的原因:
- 消息被拒绝(Rejection):消费者处理消息时显式地拒绝了该消息(
Reject
或Nack
),并且没有要求将其重新入队。 - 消息过期(TTL Expiration):消息在队列中存储的时间超过了设置的TTL(声明队列时arg设置参数
x-message-ttl
,单位为毫秒),即消息的存活时间。 - 队列长度限制(Queue Length Limit):队列的消息数量达到了预设的最大值(声明队列时arg设置参数
x-max-length
),新的消息无法被加入到队列中。 - 消息返回到仲裁队列的次数超过了投递限制的次数。
如何设置死信队列
在RabbitMQ中,可以通过设置队列的参数(args)来指定死信队列。主要涉及以下几个参数:
x-dead-letter-exchange
:指定死信消息要发送到的交换机。x-dead-letter-routing-key
:指定死信消息的路由键(可选)。
在go语言设置死信队列 示例。
使用场景
- 错误处理:将无法处理的消息转移到死信队列,可以进行后续分析和重试。
- 消息监控:监控死信队列中的消息,及时发现和处理异常情况。
- 延迟队列:通过TTL和死信队列的组合,可以实现消息的延迟投递。
通过合理配置和使用死信队列,可以提高系统的可靠性和可维护性。
注意事项
设置生产者的的queue的args与消费者queue的args必须一致,否则会报错
Exception (406) Reason: \"PRECONDITION_FAILED - inequivalent arg ......
对于已经设置的queue,如果后面添加args,会报错
报错Exception (504) Reason: "channel/connection is not open"
总结
上面介绍了rabbitMQ各个类型消息队列的简单使用,在实际使用中,连接rabbitMQ应该有网络断开重连功能。如果消费端处理消息比较慢,在消费端设置channel.Qos来限制每次消费消息的数量,平衡消息吞吐量和公平性,防止消费者受到消息突发流量冲击。例如设置prefetch_count=1,则表示每个消费者每次只会处理1条消息,处理完成后才会获取下一条消息。这可以防止少数消费者处理能力弱导致大量消息堆积。 适当地设置这些参数,可以优化rabbitMQ在大量消息场景下的性能表现。
这是在github.com/rabbitmq/amqp091-go
基础上封装的 rabbitmq 库。
参考:
专题「消息中间件」的其它文章 »
- kafka基础和使用 (Jul 11, 2024)