rabbitmq基础和使用

rabbitMQ简介

rabbitMQ是一个广泛使用的开源消息队列系统,它实现了高级消息队列协议(AMQP)标准,为分布式应用程序提供了强大的消息传递功能。rabbitMQ是 Erlang 语言编写的,具有高度的可扩展性和可靠性,因此被广泛用于构建分布式、异步的消息通信系统。

消息队列的概念

消息队列是一种通信模式,用于在不同组件、服务或应用程序之间传递消息。它允许发送者将消息放入队列,而接收者可以从队列中获取消息,实现了解耦、异步通信和数据传递的目标。消息队列通常用于处理以下情况:

  • 异步通信:发送方和接收方之间不需要立即响应,提高了系统的可伸缩性和性能。
  • 任务排队:将需要处理的任务放入队列,由工作进程异步执行。
  • 解耦组件:允许不同的应用程序或服务之间进行松耦合的通信。


rabbitMQ的四种工作模式

  • Direct Exchange(直连交换机):对于每个队列与direct交换机绑定的key进行完全匹配。
  • Topic Exchange(主题交换机) :对于每个队列与Topic交换机绑定的key进行模糊匹配。
  • Fanout Exchange(扇出型交换机): Fanout类型的交换机会将消息分发给所有绑定了此交换机的队列。
  • Headers Exchange(头交换机):Headers类型的交换机是通过headers信息来匹配的,工作原理与direct类型类似。


rabbitMQ 的核心概念

  • Producer(生产者):负责向消息队列发送消息的应用程序或服务。
  • Consumer(消费者):负责从消息队列接收和处理消息的应用程序或服务。
  • Queue(队列):用于存储消息的缓冲区,消费者从队列中获取消息进行处理。
  • Exchange(交换机):接收生产者发送的消息并将其路由到一个或多个队列。
  • Binding(绑定):定义了队列和交换机之间的关系,指定了如何将消息从交换机路由到队列。
  • Virtual Host(虚拟主机):rabbitMQ允许将多个逻辑消息队列隔离到不同的虚拟主机中,以实现资源隔离和多租户支持。


工作流程

rabbitMQ的工作流程如下:

  • 生产者将消息发布到一个或多个交换机。
  • 交换机根据绑定规则将消息路由到一个或多个队列。
  • 消费者订阅队列并接收消息。
  • 消费者处理消息,并可以确认消息已被成功处理。
  • 消息可以持久化到磁盘,以确保在 rabbitMQ重启后不会丢失。


消息确认和持久化

rabbitMQ具有高度的可靠性,它支持消息确认机制,确保消息在成功处理后才从队列中删除。如果消费者在处理消息时发生错误,消息将被重新排队,而不会丢失。此外,rabbitMQ还支持将消息持久化到磁盘,以防止消息在系统故障时丢失。


可用性和扩展性

rabbitMQ具有高可用性和可伸缩性的特性。它支持镜像队列(Queue Mirroring)来确保队列数据的冗余备份,以提高可用性。此外,rabbitMQ集群可以水平扩展,允许将多个节点添加到集群中以增加处理能力。


协议支持

rabbitMQ支持多种协议,包括 AMQP(高级消息队列协议)、STOMP、MQTT 等。这使得不同类型的应用程序可以与 rabbitMQ进行通信,而无需修改现有代码。


应用场景

  • 分布式系统通信:用于不同组件或服务之间的消息传递。
  • 异步任务处理:将需要执行的任务放入队列,由工作者进行处理。
  • 日志和监控数据的收集:将日志和监控数据发送到 rabbitMQ,以进行集中处理和分析。


安装rabbitMQ

在docker安装单机版rabbitMQ

docker-compose.yaml配置文件内容如下:

version: '3'
 
services:
  rabbitmq:
    image: rabbitmq:3.12-management
    container_name: rabbitmq
    hostname: rabbitmq-service
    restart: always
    ports:
      - 5672:5672
      - 15672:15672
    volumes:
      - $PWD/data:/var/lib/rabbitmq
      - $PWD/plugins/enabled_plugins:/etc/rabbitmq/enabled_plugins
      - $PWD/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez:/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez
    environment:
      TZ: Asia/Shanghai
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
      RABBITMQ_DEFAULT_VHOST: /

启动rabbitmq:

docker-compose up -d

可以在浏览器访问管理后台 http://localhost:15672,户名和密码都是guest。


在docker安装高可用的rabbitMQ集群

安装根据实际需要使用普通模式和镜像模式,一共有三个rabbitmq节点和一个高可用代理服务haproxy,haproxy务作为代理连接入口,文件列表如下:

.
├── cluster-entrypoint.sh
├── docker-compose.yml
├── .env
└── haproxy.cfg


(1) 加入rabbitMQ集群的脚本文件cluster-entrypoint.sh内容如下:

#!/bin/bash

set -e

# Start RMQ from entry point.
# This will ensure that environment variables passed
# will be honored
/usr/local/bin/docker-entrypoint.sh rabbitmq-server -detached

# Do the cluster dance
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq1

# Stop the entire RMQ server. This is done so that we
# can attach to it again, but without the -detached flag
# making it run in the forground
rabbitmqctl stop

# Wait a while for the app to really stop
sleep 2s

# Start it
rabbitmq-server


(2) docker-compose配置文件的环境变量.env内容如下,.env文件包含可用于更改默认用户名,密码等。

RABBITMQ_DEFAULT_USER=guest
RABBITMQ_DEFAULT_PASS=guest
RABBITMQ_DEFAULT_VHOST=/
RABBITMQ_ERLANG_COOKIE=rec123456


(3) 高可用服务的配置文件haproxy.cfg内容如下:

    global
            log 127.0.0.1   local1
            maxconn 4096
     
    defaults
            log     global
            mode    tcp
            option  tcplog
            retries 3
            option redispatch
            maxconn 2000
            timeout connect 5000
            timeout client 50000
            timeout server 50000
     
    listen  stats
            bind *:1936
            mode http
            stats enable
            stats hide-version
            stats realm Haproxy\ Statistics
            stats uri /
     
    listen rabbitmq
            bind *:5672
            mode            tcp
            balance         roundrobin
            timeout client  3h
            timeout server  3h
            option          clitcpka
            server          rabbitmq1 rabbitmq1:5672  check inter 5s rise 2 fall 3
            server          rabbitmq2 rabbitmq2:5672  check inter 5s rise 2 fall 3
            server          rabbitmq3 rabbitmq3:5672  check inter 5s rise 2 fall 3

    listen mgmt
            bind *:15672
            mode            tcp
            balance         roundrobin
            timeout client  3h
            timeout server  3h
            option          clitcpka
            server          rabbitmq1 rabbitmq1:15672  check inter 5s rise 2 fall 3
            server          rabbitmq2 rabbitmq2:15672  check inter 5s rise 2 fall 3
            server          rabbitmq3 rabbitmq3:15672  check inter 5s rise 2 fall 3


(4) docker-compose.yml配置文件内容如下:

version: '3'

services:

  rabbitmq1:
    hostname: rabbitmq1
    image: rabbitmq:3.12-management
    restart: always
    environment:
      - RABBITMQ_ERLANG_COOKIE=${RABBITMQ_ERLANG_COOKIE}
      - RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER}
      - RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS}
      - RABBITMQ_DEFAULT_VHOST=${RABBITMQ_DEFAULT_VHOST}
    volumes:
      - ./data/rabbitmq1:/var/lib/rabbitmq/mnesia

  rabbitmq2:
    hostname: rabbitmq2
    image: rabbitmq:3.12-management
    restart: always
    depends_on:
      - rabbitmq1
    environment:
      - RABBITMQ_ERLANG_COOKIE=${RABBITMQ_ERLANG_COOKIE}
    volumes:
      - ./cluster-entrypoint.sh:/usr/local/bin/cluster-entrypoint.sh
      - ./data/rabbitmq2:/var/lib/rabbitmq/mnesia
    entrypoint: sh /usr/local/bin/cluster-entrypoint.sh

  rabbitmq3:
    hostname: rabbitmq3
    image: rabbitmq:3.12-management
    restart: always
    depends_on:
      - rabbitmq1
    environment:
      - RABBITMQ_ERLANG_COOKIE=${RABBITMQ_ERLANG_COOKIE}
    volumes:
      - ./cluster-entrypoint.sh:/usr/local/bin/cluster-entrypoint.sh
      - ./data/rabbitmq3:/var/lib/rabbitmq/mnesia
    entrypoint: sh /usr/local/bin/cluster-entrypoint.sh
    
  haproxy:
    image: haproxy:1.9
    restart: always
    volumes:
      - ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
    depends_on:
      - rabbitmq1
      - rabbitmq2
      - rabbitmq3
    ports:
      - 15672:15672
      - 5672:5672


启动rabbitMQ集群:

docker-compose up -d

# 默认用户名guest,默认密码guest
# 代理连接地址:localhost:5672
# 管理界面地址:localhost:15672

可以在浏览器访问管理后台 http://localhost:15672,户名和密码都是guest。


在k8s安装rabbitMQ集群

根据实际需要,安装rabbitmq集群可以选择普通模式和镜像模式,如果需要设置为镜像模式,在普通集群的中任意节点启用策略,策略会自动同步到集群节点:

rabbitmqctl set_policy ha-all “^ha.” ‘{“ha-mode”:“all”}’

注意:”^ha” 这个规则要根据自己修改,这个是指同步”ha”开头的队列名称,配置时使用的应用于所有队列,所以表达式为”^“。


创建rabbitmq集群的Erlang cookie,配置文件rabbitmq-secret.yml内容如下:

apiVersion: v1
kind: Secret
metadata:
  name: rabbitmq-config
  namespace: default
data:
  erlang-cookie: |-
    MTIzNDU2Nzg5MAo=


使用statefulset启动rabbitmq,配置文件rabbitmq-sts.yml内容如下:

apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: rabbitmq
spec:
  serviceName: rabbitmq
  replicas: 3
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      containers:
      - name: rabbitmq
        image: rabbitmq:3.12-management
        lifecycle:
          postStart:
            exec:
              command:
              - /bin/sh
              - -c
              - >
                if [ -z "$(grep rabbitmq /etc/resolv.conf)" ]; then
                  sed "s/^search \([^ ]\+\)/search rabbitmq.\1 \1/" /etc/resolv.conf > /etc/resolv.conf.new;
                  cat /etc/resolv.conf.new > /etc/resolv.conf;
                  rm /etc/resolv.conf.new;
                fi;
                until rabbitmqctl node_health_check; do sleep 1; done;
                if [[ "$HOSTNAME" != "rabbitmq-0" && -z "$(rabbitmqctl cluster_status | grep rabbitmq-0)" ]]; then
                  rabbitmqctl stop_app;
                  rabbitmqctl join_cluster rabbit@rabbitmq-0;
                  rabbitmqctl start_app;
                fi;
                rabbitmqctl set_policy ha-all "." '{"ha-mode":"exactly","ha-params":3,"ha-sync-mode":"automatic"}'
        env:
        - name: RABBITMQ_ERLANG_COOKIE
          valueFrom:
            secretKeyRef:
              name: rabbitmq-config
              key: erlang-cookie
        ports:
        - containerPort: 5672
          name: amqp
        volumeMounts:
        - name: rabbitmq
          mountPath: /var/lib/rabbitmq
  volumeClaimTemplates:
  - metadata:
      name: rabbitmq
      annotations:
        volume.alpha.kubernetes.io/storage-class: anything
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 2Gi

注:必须准备好存储卷后,pod才能启动。


rabbitmq服务配置文件rabbitmq-svc.yml内容如下,对外开放http端口,其它端口在集群内部使用。

apiVersion: v1
kind: Service
metadata:
  # Expose the management HTTP port on each node
  name: rabbitmq-management
  labels:
    app: rabbitmq
spec:
  selector:
    app: rabbitmq
  type: NodePort
  ports:
  - port: 15672
    name: http
    nodePort: 30072

---

apiVersion: v1
kind: Service
metadata:
  # The required headless service for StatefulSets
  name: rabbitmq
  labels:
    app: rabbitmq
spec:
  selector:
    app: rabbitmq
  clusterIP: None
  ports:
  - port: 5672
    name: amqp
  - port: 4369
    name: epmd
  - port: 25672
    name: rabbitmq-dist


启动rabbitmq集群

kubectl apply -f rabbitmq-secret.yml
kubectl apply -f rabbitmq-sts.yml
kubectl apply -f rabbitmq-svc.yml

# 默认用户名guest,默认密码guest
# 代理连接地址:rabbitmq.default:5672
# 管理界面地址:node-ip:30072


Direct类型消息队列的golang示例

direct

生产端示例代码

package main

import (
    "context"
    "fmt"

    amqp "github.com/rabbitmq/amqp091-go"
)

var (
    url          = "amqp://guest:guest@192.168.3.37:5672/"
    exchangeName = "direct-exchange-demo"
    queueName    = "direct-queue-demo"
)

func main() {
    conn, err := amqp.Dial(url)
    checkErr(err)
    defer conn.Close()

    ctx := context.Background()

    routingKey1 := "info"
    exchange1 := NewDirectExchange(exchangeName, routingKey1)
    p1, err := NewProducer(queueName, conn, exchange1)
    checkErr(err)
    defer p1.Close()

    routingKey2 := "warning"
    exchange2 := NewDirectExchange(exchangeName, routingKey2)
    p2, err := NewProducer(queueName, conn, exchange2)
    checkErr(err)
    defer p2.Close()

    var body string
    count := 0
    for i := 1; i <= 5; i++ {
        body = fmt.Sprintf("%s message %d", routingKey1, i)
        err = p1.Publish(ctx, []byte(body)) // p1 发送消息
        checkErr(err)

        body = fmt.Sprintf("%s message %d", routingKey2, i)
        err = p2.Publish(ctx, []byte(body)) // p2 发送消息
        checkErr(err)
        count += 2
    }
    fmt.Println("publish total", count)
}

// Exchange 交换机
type Exchange struct {
    Name       string // exchange名称
    Type       string // exchange类型
    RoutingKey string // 路由key
}

// NewDirectExchange 实例化一个direct类型交换机
func NewDirectExchange(exchangeName string, routingKey string) *Exchange {
    return &Exchange{
        Name:       exchangeName,
        Type:       "direct",
        RoutingKey: routingKey,
    }
}

// Producer 生产者对象
type Producer struct {
    queueName string
    exchange  *Exchange
    conn      *amqp.Connection
    ch        *amqp.Channel
}

// NewProducer 实例化一个生产者
func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) {
    // 创建管道
    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    // 声明交换机类型
    err = ch.ExchangeDeclare(
        exchange.Name, // 交换机名称
        exchange.Type, // 交换机的类型
        true,          // 是否持久化
        false,         // 是否自动删除
        false,         // 是否公开,false即公开
        false,         // 是否等待
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 声明队列,如果队列不存在则自动创建,存在则跳过创建
    q, err := ch.QueueDeclare(
        queueName, // 消息队列名称
        true,      // 是否持久化
        false,     // 是否自动删除
        false,     // 是否具有排他性(仅创建它的程序才可用)
        false,     // 是否阻塞处理
        nil,       // 额外的属性
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 绑定队列和交换机
    err = ch.QueueBind(
        q.Name,              // 队列名称
        exchange.RoutingKey, // 路由key
        exchange.Name,       // 交换机名称
        false,
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    return &Producer{
        queueName: queueName,
        conn:      conn,
        ch:        ch,
        exchange:  exchange,
    }, nil
}

// Publish 发送消息
func (p *Producer) Publish(ctx context.Context, body []byte) error {
    err := p.ch.PublishWithContext(
        ctx,
        p.exchange.Name,       // exchange name
        p.exchange.RoutingKey, // key
        false,                 // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者
        false,                 // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
        amqp.Publishing{
            //DeliveryMode: amqp.Persistent, // 如果队列的声明是持久化的,那么消息也设置为持久化
            ContentType: "text/plain",
            Body:        body,
        },
    )
    if err != nil {
        return err
    }
    fmt.Println("[send]: " + string(body))
    return nil
}

// Close 关闭生产者
func (p *Producer) Close() {
    if p.ch != nil {
        _ = p.ch.Close()
    }
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}


消费端示例代码

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "sync/atomic"
    "syscall"

    amqp "github.com/rabbitmq/amqp091-go"
)

var (
    url          = "amqp://guest:guest@192.168.3.37:5672/"
    exchangeName = "direct-exchange-demo"
    queueName    = "direct-queue-demo"
)

func main() {
    conn, err := amqp.Dial(url)
    checkErr(err)
    defer conn.Close()

    ctx := context.Background()

    routingKey1 := "info"
    exchange1 := NewDirectExchange(exchangeName, routingKey1)
    c1, err := NewConsumer(ctx, queueName, exchange1, conn)
    checkErr(err)
    c1.Consume() // 消费消息
    defer c1.Close()

    routingKey2 := "warning"
    exchange2 := NewDirectExchange(exchangeName, routingKey2)
    c2, err := NewConsumer(ctx, queueName, exchange2, conn)
    checkErr(err)
    c2.Consume() // 消费消息
    defer c2.Close()

    fmt.Println("exit press CTRL+C")
    exit := make(chan os.Signal, 1)
    signal.Notify(exit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
    <-exit
    fmt.Println("exit consume messages,  received total", counter)
}

// Exchange 交换机
type Exchange struct {
    Name       string // exchange名称
    Type       string // exchange类型,支持"direct"、"topic"、"fanout"、"headers"
    RoutingKey string // 路由key
}

// NewDirectExchange 实例化一个direct类型交换机
func NewDirectExchange(exchangeName string, routingKey string) *Exchange {
    return &Exchange{
        Name:       exchangeName,
        Type:       "direct",
        RoutingKey: routingKey,
    }
}

// Consumer 消费者
type Consumer struct {
    ctx       context.Context
    queueName string
    conn      *amqp.Connection
    ch        *amqp.Channel
    delivery  <-chan amqp.Delivery
    exchange  *Exchange
}

// NewConsumer 实例化一个消费者
func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) {
    // 创建管道
    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    // 声明交换机类型
    err = ch.ExchangeDeclare(
        exchange.Name, // 交换机名称
        exchange.Type, // 交换机的类型
        true,          // 是否持久化
        false,         // 是否自动删除
        false,         // 是否公开,false即公开
        false,         // 是否等待
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 声明队列,如果队列不存在则自动创建,存在则跳过创建
    q, err := ch.QueueDeclare(
        queueName, // 消息队列名称
        true,      // 是否持久化
        false,     // 是否自动删除
        false,     // 是否具有排他性(仅创建它的程序才可用)
        false,     // 是否阻塞处理
        nil,       // 额外的属性
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 绑定队列和交换机
    err = ch.QueueBind(
        q.Name,
        exchange.RoutingKey,
        exchange.Name,
        false,
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    delivery, err := ch.ConsumeWithContext(
        ctx,
        queueName, // queue 名称
        "",        // consumer 用来区分多个消费者
        true,      // auto-ack 是否自动应答
        false,     // exclusive 是否独有
        false,     // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
        false,     // no-wait 是否阻塞
        nil,       // args
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    return &Consumer{
        ctx:       ctx,
        queueName: queueName,
        conn:      conn,
        ch:        ch,
        delivery:  delivery,
        exchange:  exchange,
    }, nil
}

var counter int32 = 0

// Consume 接收消息
func (c *Consumer) Consume() {
    go func() {
        fmt.Printf("waiting for messages, type=%s, queue=%s, key=%s\n", c.exchange.Type, c.queueName, c.exchange.RoutingKey)
        for d := range c.delivery {
            // 处理消息
            if d.RoutingKey == "info" {
                fmt.Printf("[info received]: %s\n", d.Body)
            } else if d.RoutingKey == "warning" {
                fmt.Printf("[warning received]: %s\n", d.Body)
            }
            atomic.AddInt32(&counter, 1)
            // _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack
        }
    }()
}

// Close 关闭
func (c *Consumer) Close() {
    if c.ch != nil {
        _ = c.ch.Close()
    }
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}


topic类型消息队列的golang示例

topic

topic中,将routingkey通过”.“来分为多个部分,匹配规则

  • “*“:代表一个部分,例如routingkey为key1.*或*.key2, topic=key1.key2都可以匹配
  • ”#“:代表0个或多个部分,例如routingkey为key1.#或#.key3, topic=key1.key2.key3都可以匹配,注意:如果绑定的路由键为 “#” 时,则接受所有消息,因为路由键所有都匹配。

生产端示例代码

package main

import (
    "context"
    "fmt"

    amqp "github.com/rabbitmq/amqp091-go"
)

var (
    url          = "amqp://guest:guest@192.168.3.37:5672/"
    exchangeName = "topic-exchange-demo"
)

func main() {
    conn, err := amqp.Dial(url)
    checkErr(err)
    defer conn.Close()

    ctx := context.Background()

    queueName := "topic-queue-1"
    exchange := NewTopicExchange(exchangeName, "*.orange.*")
    p, err := NewProducer(queueName, conn, exchange)
    checkErr(err)
    key := "key1.orange.key3"
    err = p.Publish(ctx, key, []byte(key+" say hello"))
    checkErr(err)
    defer p.Close()

    queueName = "topic-queue-2"
    exchange = NewTopicExchange(exchangeName, "*.*.rabbit")
    p, err = NewProducer(queueName, conn, exchange)
    checkErr(err)
    defer p.Close()
    key = "key1.key2.rabbit"
    err = p.Publish(ctx, key, []byte(key+" say hello"))
    checkErr(err)

    exchange = NewTopicExchange(exchangeName, "lazy.#")
    p, err = NewProducer(queueName, conn, exchange)
    checkErr(err)
    defer p.Close()
    key = "lazy.key2.key3"
    err = p.Publish(ctx, key, []byte(key+" say hello"))
    checkErr(err)
}

// Exchange 交换机
type Exchange struct {
    Name       string // exchange名称
    Type       string // exchange类型,支持"direct"、"topic"、"fanout"、"headers"
    RoutingKey string // 路由key
}

// NewTopicExchange 实例化一个topic类型交换机
func NewTopicExchange(exchangeName string, routingKey string) *Exchange {
    return &Exchange{
        Name:       exchangeName,
        Type:       "topic",
        RoutingKey: routingKey,
    }
}

// Producer 生产者对象
type Producer struct {
    queueName string
    exchange  *Exchange
    conn      *amqp.Connection
    ch        *amqp.Channel
}

// NewProducer 实例化一个生产者
func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) {
    // 创建管道
    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    // 声明交换机类型
    err = ch.ExchangeDeclare(
        exchange.Name, // 交换机名称
        exchange.Type, // 交换机的类型,支持direct、topic、fanout、headers
        true,          // 是否持久化
        false,         // 是否自动删除
        false,         // 是否公开,false即公开
        false,         // 是否等待
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 声明队列,如果队列不存在则自动创建,存在则跳过创建
    q, err := ch.QueueDeclare(
        queueName, // 消息队列名称
        true,      // 是否持久化
        false,     // 是否自动删除
        false,     // 是否具有排他性(仅创建它的程序才可用)
        false,     // 是否阻塞处理
        nil,       // 额外的属性
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 绑定队列和交换机
    err = ch.QueueBind(
        q.Name,
        exchange.RoutingKey,
        exchange.Name,
        false,
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    return &Producer{
        queueName: queueName,
        conn:      conn,
        ch:        ch,
        exchange:  exchange,
    }, nil
}

// Publish 发送消息
func (p *Producer) Publish(ctx context.Context, routingKey string, body []byte) error {
    err := p.ch.PublishWithContext(
        ctx,
        p.exchange.Name, // exchange name
        routingKey,      // key
        false,           // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者
        false,           // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        body,
        },
    )
    if err != nil {
        return err
    }
    fmt.Printf("[send]: %s\n", body)
    return nil
}

// Close 关闭
func (p *Producer) Close() {
    if p.ch != nil {
        _ = p.ch.Close()
    }
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}


消费端示例代码

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"

    amqp "github.com/rabbitmq/amqp091-go"
)

var (
    url          = "amqp://guest:guest@192.168.3.37:5672/"
    exchangeName = "topic-exchange-demo"
)

func main() {
    conn, err := amqp.Dial(url)
    checkErr(err)
    defer conn.Close()

    ctx := context.Background()
    var queueName string // 对应的发送端的队列名称,如果发送的key模糊匹配命中,可以接收到消息

    queueName = "topic-queue-1"
    exchange := NewTopicExchange(exchangeName, "*.orange.*")
    queue, err := NewConsumer(ctx, queueName, exchange, conn)
    checkErr(err)
    queue.Consume()
    defer queue.Close()

    queueName = "topic-queue-2"
    exchange = NewTopicExchange(exchangeName, "*.*.rabbit")
    queue, err = NewConsumer(ctx, queueName, exchange, conn)
    checkErr(err)
    defer queue.Close()
    queue.Consume()
    exchange = NewTopicExchange(exchangeName, "lazy.#")
    queue, err = NewConsumer(ctx, queueName, exchange, conn)
    checkErr(err)
    defer queue.Close()
    queue.Consume()

    fmt.Println("exit press CTRL+C")
    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
    <-interrupt
    fmt.Println("exit consume messages")
}

// Exchange 交换机
type Exchange struct {
    Name       string // exchange名称
    Type       string // exchange类型,支持"direct"、"topic"、"fanout"、"headers"
    RoutingKey string // 路由key
}

// NewTopicExchange 实例化一个topic类型交换机
func NewTopicExchange(exchangeName string, routingKey string) *Exchange {
    return &Exchange{
        Name:       exchangeName,
        Type:       "topic",
        RoutingKey: routingKey,
    }
}

// Consumer 消费者
type Consumer struct {
    ctx       context.Context
    queueName string
    conn      *amqp.Connection
    ch        *amqp.Channel
    delivery  <-chan amqp.Delivery
    exchange  *Exchange
}

// NewConsumer 实例化一个消费者
func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) {
    // 创建管道
    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    // 声明交换机类型
    err = ch.ExchangeDeclare(
        exchange.Name, // 交换机名称
        exchange.Type, // 交换机的类型
        true,          // 是否持久化
        false,         // 是否自动删除
        false,         // 是否公开,false即公开
        false,         // 是否等待
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 声明队列,如果队列不存在则自动创建,存在则跳过创建
    q, err := ch.QueueDeclare(
        queueName, // 消息队列名称
        true,      // 是否持久化
        false,     // 是否自动删除
        false,     // 是否具有排他性(仅创建它的程序才可用)
        false,     // 是否阻塞处理
        nil,       // 额外的属性
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 绑定队列和交换机
    err = ch.QueueBind(
        q.Name,
        exchange.RoutingKey,
        exchange.Name,
        false,
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 为消息队列注册消费者
    delivery, err := ch.ConsumeWithContext(
        ctx,
        queueName, // queue 名称
        "",        // consumer 用来区分多个消费者
        true,      // auto-ack 是否自动应答
        false,     // exclusive 是否独有
        false,     // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
        false,     // no-wait 是否阻塞
        nil,       // args
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    return &Consumer{
        ctx:       ctx,
        queueName: queueName,
        conn:      conn,
        ch:        ch,
        delivery:  delivery,
        exchange:  exchange,
    }, nil
}

// Consume 接收消息
func (c *Consumer) Consume() {
    go func() {
        fmt.Printf("waiting for messages, type=%s, queue=%s, key=%s\n", c.exchange.Type, c.queueName, c.exchange.RoutingKey)
        for d := range c.delivery {
            // 处理消息
            fmt.Printf("[%s received]: %s\n", c.queueName, d.Body)
            // _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack
        }
    }()
}

// Close 关闭
func (c *Consumer) Close() {
    if c.ch != nil {
        _ = c.ch.Close()
    }
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}


fanout类型消息队列的golang示例

fanout

fanout类型的交换机会将消息分发给所有绑定了此交换机的队列,此时routingkey参数相当于无效。可以使用fanout来实现发布订阅。

生产端示例代码

package main

import (
    "context"
    "fmt"

    amqp "github.com/rabbitmq/amqp091-go"
)

var (
    url          = "amqp://guest:guest@192.168.3.37:5672/"
    exchangeName = "fanout-exchange-demo"
)

func main() {
    conn, err := amqp.Dial(url)
    checkErr(err)
    defer conn.Close()

    ctx := context.Background()

    queueName := "fanout-queue-1"
    exchange := NewFanOutExchange(exchangeName)
    p, err := NewProducer(queueName, conn, exchange)
    checkErr(err)
    defer p.Close()
    err = p.Publish(ctx, []byte(queueName+" say hello"))
    checkErr(err)

    queueName = "fanout-queue-2"
    exchange = NewFanOutExchange(exchangeName)
    p, err = NewProducer(queueName, conn, exchange)
    checkErr(err)
    defer p.Close()
    err = p.Publish(ctx, []byte(queueName+" say hello"))
    checkErr(err)

    queueName = "fanout-queue-3"
    exchange = NewFanOutExchange(exchangeName)
    p, err = NewProducer(queueName, conn, exchange)
    checkErr(err)
    defer p.Close()
    err = p.Publish(ctx, []byte(queueName+" say hello"))
    checkErr(err)
}

// Exchange 交换机
type Exchange struct {
    Name       string // exchange名称
    Type       string // exchange类型,支持direct、topic、fanout、headers
    RoutingKey string // 路由key
}

// NewFanOutExchange 实例化一个fanout类型交换机
func NewFanOutExchange(exchangeName string) *Exchange {
    return &Exchange{
        Name:       exchangeName,
        Type:       "fanout",
        RoutingKey: "",
    }
}

// Producer 生产者对象
type Producer struct {
    queueName string
    exchange  *Exchange
    conn      *amqp.Connection
    ch        *amqp.Channel
}

// NewProducer 实例化一个生产者
func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) {
    // 创建管道
    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    // 声明交换机类型
    err = ch.ExchangeDeclare(
        exchange.Name, // 交换机名称
        exchange.Type, // 交换机的类型
        true,          // 是否持久化
        false,         // 是否自动删除
        false,         // 是否公开,false即公开
        false,         // 是否等待
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 声明队列,如果队列不存在则自动创建,存在则跳过创建
    q, err := ch.QueueDeclare(
        queueName, // 消息队列名称
        true,      // 是否持久化
        false,     // 是否自动删除
        false,     // 是否具有排他性(仅创建它的程序才可用)
        false,     // 是否阻塞处理
        nil,       // 额外的属性
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 绑定队列和交换机
    err = ch.QueueBind(
        q.Name,
        exchange.RoutingKey, // 如果交换机类型为fanout,此参数无效
        exchange.Name,
        false,
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    return &Producer{
        queueName: queueName,
        conn:      conn,
        ch:        ch,
        exchange:  exchange,
    }, nil
}

// Publish 发送消息
func (p *Producer) Publish(ctx context.Context, body []byte) error {
    err := p.ch.PublishWithContext(
        ctx,
        p.exchange.Name, // exchange name
        "",              // key  如果类型为fanout,此参数无效
        false,           // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者
        false,           // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        body,
        },
    )
    if err != nil {
        return err
    }
    fmt.Printf("[send]: %s\n", body)
    return nil
}

// Close 关闭生产者
func (p *Producer) Close() {
    if p.ch != nil {
        _ = p.ch.Close()
    }
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}


消费端示例代码

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"

    amqp "github.com/rabbitmq/amqp091-go"
)

var (
    url          = "amqp://guest:guest@192.168.3.37:5672/"
    exchangeName = "fanout-exchange-demo"
)

func main() {
    conn, err := amqp.Dial(url)
    checkErr(err)
    defer conn.Close()

    ctx := context.Background()
    var queueName string // 对应的发送端的队列名称,如果发送的key模糊匹配命中,可以接收到消息

    queueName = "fanout-queue"
    exchange := NewFanOutExchange(exchangeName)
    queue, err := NewConsumer(ctx, queueName, exchange, conn)
    checkErr(err)
    queue.Consume() // 消费消息
    defer queue.Close()

    fmt.Println("exit press CTRL+C")
    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
    <-interrupt
    fmt.Println("exit consume messages")
}

// Exchange 交换机
type Exchange struct {
    Name       string // exchange名称
    Type       string // exchange类型,支持"direct"、"topic"、"fanout"、"headers"
    RoutingKey string // 路由key
}

// NewFanOutExchange 实例化一个fanout类型交换机
func NewFanOutExchange(exchangeName string) *Exchange {
    return &Exchange{
        Name:       exchangeName,
        Type:       "fanout",
        RoutingKey: "",
    }
}

// Consumer 消费者
type Consumer struct {
    ctx       context.Context
    queueName string
    conn      *amqp.Connection
    ch        *amqp.Channel
    delivery  <-chan amqp.Delivery
    exchange  *Exchange
}

// NewConsumer 实例化一个消费者
func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) {
    // 创建管道
    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    // 声明交换机类型
    err = ch.ExchangeDeclare(
        exchange.Name, // 交换机名称
        exchange.Type, // 交换机的类型
        true,          // 是否持久化
        false,         // 是否自动删除
        false,         // 是否公开,false即公开
        false,         // 是否等待
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 声明队列,如果队列不存在则自动创建,存在则跳过创建
    q, err := ch.QueueDeclare(
        queueName, // 消息队列名称
        true,      // 是否持久化
        false,     // 是否自动删除
        false,     // 是否具有排他性(仅创建它的程序才可用)
        false,     // 是否阻塞处理
        nil,       // 额外的属性
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 绑定队列和交换机
    err = ch.QueueBind(
        q.Name,              // 队列名称
        exchange.RoutingKey, // 如果是fanout类型,无效
        exchange.Name,       // 交换机名称
        false,
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 为消息队列注册消费者
    delivery, err := ch.ConsumeWithContext(
        ctx,
        queueName, // queue 名称
        "",        // consumer 用来区分多个消费者
        true,      // auto-ack 是否自动应答
        false,     // exclusive 是否独有
        false,     // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
        false,     // no-wait 是否阻塞
        nil,       // args
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    return &Consumer{
        ctx:       ctx,
        queueName: queueName,
        conn:      conn,
        ch:        ch,
        delivery:  delivery,
        exchange:  exchange,
    }, nil
}

// Consume 接收消息
func (c *Consumer) Consume() {
    go func() {
        fmt.Printf("waiting for messages, queue = %s\n", c.queueName)
        for d := range c.delivery {
            // 处理消息
            fmt.Printf("[%s received]: %s\n", c.queueName, d.Body)
            // _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack
        }
    }()
}

// Close 关闭
func (c *Consumer) Close() {
    if c.ch != nil {
        _ = c.ch.Close()
    }
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}


header类型消息队列的golang示例

headers匹配AMQP消息的header而不是路由键,此时routingkey参数相当于无效,headers交换机和direct交换机类似。

消费方指定的headers中必须指定一个”x-match”的键,键”x-match”的值只有2个

  • x-match=all:表示所有的键值对都匹配才能接收到消息
  • x-match=any:表示只要键值对匹配就能接收消息

生产端示例代码

package main

import (
    "context"
    "fmt"

    amqp "github.com/rabbitmq/amqp091-go"
)

var (
    url          = "amqp://guest:guest@192.168.3.37:5672/"
    exchangeName = "headers-exchange-demo"
)

func main() {
    conn, err := amqp.Dial(url)
    checkErr(err)
    defer conn.Close()

    ctx := context.Background()

    // 完全匹配headers,消费端才能收到消息
    queueName1 := "headers-queue-1"
    kv1 := map[string]interface{}{"hello1": "world1", "foo1": "bar1"}
    exchange1 := NewHeaderExchange(exchangeName, "all", kv1)
    p1, err := NewProducer(queueName1, conn, exchange1)
    checkErr(err)
    defer p1.Close()
    headersKey1 := kv1 // 完全匹配,消费端队列可以收到消息
    err = p1.Publish(ctx, headersKey1, []byte(p1.queueName+" say hello 1"))
    checkErr(err)
    headersKey1 = map[string]interface{}{"k": "v"} // 完全不匹配,消费端队列不能收到消息
    err = p1.Publish(ctx, headersKey1, []byte(p1.queueName+" say hello 2"))
    checkErr(err)
    headersKey1 = map[string]interface{}{"foo1": "bar1"} // 部分匹配,消费端队列不能收到消息
    err = p1.Publish(ctx, headersKey1, []byte(p1.queueName+" say hello 3"))
    checkErr(err)

    // 部分匹配headers,消费端能收到消息
    queueName2 := "headers-queue-2"
    kv2 := map[string]interface{}{"hello2": "world2", "foo2": "bar2"}
    exchange2 := NewHeaderExchange(exchangeName, "any", kv2)
    p2, err := NewProducer(queueName2, conn, exchange2)
    checkErr(err)
    defer p2.Close()
    headersKey2 := kv2 // 完全匹配,消费端队列可以收到消息
    err = p2.Publish(ctx, headersKey2, []byte(p2.queueName+" say hello 4"))
    checkErr(err)
    headersKey2 = map[string]interface{}{"k": "v"} // 完全不匹配,消费端队列不能收到消息
    err = p2.Publish(ctx, headersKey2, []byte(p2.queueName+" say hello 5"))
    checkErr(err)
    headersKey2 = map[string]interface{}{"foo2": "bar2"} // 部分匹配,消费端队列可以收到消息
    err = p2.Publish(ctx, headersKey2, []byte(p2.queueName+" say hello 6"))
    checkErr(err)
}

// Exchange 交换机
type Exchange struct {
    Name       string                 // exchange名称
    Type       string                 // exchange类型,支持"direct"、"topic"、"fanout"、"headers"
    RoutingKey string                 // 路由key,如果类型为fanout和headers,此字段无效,不需要设置
    Headers    map[string]interface{} // 如果类型为headers,此字段必填
}

// NewHeaderExchange 实例化一个header类型的交换机,headerType支持all和any
func NewHeaderExchange(exchangeName string, headerType string, kv map[string]interface{}) *Exchange {
    if kv == nil {
        kv = make(map[string]interface{})
    }

    switch headerType {
    case "all", "any":
        kv["x-match"] = headerType
    default:
        kv["x-match"] = "all"
    }

    return &Exchange{
        Name:       exchangeName,
        Type:       "headers",
        RoutingKey: "",
        Headers:    kv,
    }
}

// Producer 生产者对象
type Producer struct {
    queueName string
    exchange  *Exchange
    conn      *amqp.Connection
    ch        *amqp.Channel
}

// NewProducer 实例化一个生产者
func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) {
    // 创建管道
    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    // 声明交换机类型
    err = ch.ExchangeDeclare(
        exchange.Name, // 交换机名称
        exchange.Type, // 交换机的类型
        true,          // 是否持久化
        false,         // 是否自动删除
        false,         // 是否公开,false即公开
        false,         // 是否等待
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 声明队列,如果队列不存在则自动创建,存在则跳过创建
    q, err := ch.QueueDeclare(
        queueName, // 消息队列名称
        true,      // 是否持久化
        false,     // 是否自动删除
        false,     // 是否具有排他性(仅创建它的程序才可用)
        false,     // 是否阻塞处理
        nil,       // 额外的属性
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 绑定队列和交换机
    err = ch.QueueBind(
        q.Name,
        exchange.RoutingKey,
        exchange.Name,
        false,
        exchange.Headers,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    return &Producer{
        queueName: queueName,
        conn:      conn,
        ch:        ch,
        exchange:  exchange,
    }, nil
}

// Publish 发送消息
func (p *Producer) Publish(ctx context.Context, headers map[string]interface{}, body []byte) error {
    err := p.ch.PublishWithContext(
        ctx,
        p.exchange.Name,       // exchange name
        p.exchange.RoutingKey, // key
        false,                 // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者
        false,                 // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
        amqp.Publishing{
            Headers:     headers,
            ContentType: "text/plain",
            Body:        body,
        },
    )
    if err != nil {
        return err
    }
    fmt.Printf("[send]: %s\n", body)
    return nil
}

// Close 关闭生产者
func (p *Producer) Close() {
    if p.ch != nil {
        _ = p.ch.Close()
    }
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}


消费端示例代码

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"

    amqp "github.com/rabbitmq/amqp091-go"
)

var (
    url          = "amqp://guest:guest@192.168.3.37:5672/"
    exchangeName = "headers-exchange-demo"
)

func main() {
    conn, err := amqp.Dial(url)
    checkErr(err)
    defer conn.Close()

    ctx := context.Background()

    queueName1 := "headers-queue-1"
    kv1 := map[string]interface{}{"hello1": "world1", "foo1": "bar1"}
    exchange1 := NewHeadersExchange(exchangeName, "all", kv1)
    c1, err := NewConsumer(ctx, queueName1, exchange1, conn)
    checkErr(err)
    c1.Consume()
    defer c1.Close()

    queueName2 := "headers-queue-2"
    kv2 := map[string]interface{}{"hello2": "world2", "foo2": "bar2"}
    exchange2 := NewHeadersExchange(exchangeName, "any", kv2)
    c2, err := NewConsumer(ctx, queueName2, exchange2, conn)
    checkErr(err)
    c2.Consume()
    defer c2.Close()

    fmt.Println("exit press CTRL+C")
    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
    <-interrupt
    fmt.Println("exit consume messages")
}

// Exchange 交换机
type Exchange struct {
    Name       string                 // exchange名称
    Type       string                 // exchange类型,支持"direct"、"topic"、"fanout"、"headers"
    RoutingKey string                 // 路由key,如果类型为fanout和headers,此字段无效,不需要设置
    Headers    map[string]interface{} // 如果类型为headers,此字段必填
}

// NewHeadersExchange 创建一个header类型的交换机,headerType支持all和any
func NewHeadersExchange(exchangeName string, headerType string, kv map[string]interface{}) *Exchange {
    if kv == nil {
        kv = make(map[string]interface{})
    }

    switch headerType {
    case "all", "any":
        kv["x-match"] = headerType
    default:
        kv["x-match"] = "all"
    }

    return &Exchange{
        Name:       exchangeName,
        Type:       "headers",
        RoutingKey: "",
        Headers:    kv,
    }
}

// Consumer 消费者
type Consumer struct {
    ctx       context.Context
    queueName string
    conn      *amqp.Connection
    ch        *amqp.Channel
    delivery  <-chan amqp.Delivery
    exchange  *Exchange
}

// NewConsumer 实例化一个消费者
func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) {
    // 创建管道
    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    // 声明交换机类型
    err = ch.ExchangeDeclare(
        exchange.Name, // 交换机名称
        exchange.Type, // 交换机的类型
        true,          // 是否持久化
        false,         // 是否自动删除
        false,         // 是否公开,false即公开
        false,         // 是否等待
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 声明队列,如果队列不存在则自动创建,存在则跳过创建
    q, err := ch.QueueDeclare(
        queueName, // 消息队列名称
        true,      // 是否持久化
        false,     // 是否自动删除
        false,     // 是否具有排他性(仅创建它的程序才可用)
        false,     // 是否阻塞处理
        nil,       // 额外的属性
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 绑定队列和交换机
    err = ch.QueueBind(
        q.Name,
        exchange.RoutingKey,
        exchange.Name,
        false,
        exchange.Headers,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 为消息队列注册消费者
    delivery, err := ch.ConsumeWithContext(
        ctx,
        queueName, // queue 名称
        "",        // consumer 用来区分多个消费者
        true,      // auto-ack 是否自动应答
        false,     // exclusive 是否独有
        false,     // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
        false,     // no-wait 是否阻塞
        nil,       // args
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    return &Consumer{
        ctx:       ctx,
        queueName: queueName,
        conn:      conn,
        ch:        ch,
        delivery:  delivery,
        exchange:  exchange,
    }, nil
}

// Consume 接收消息
func (c *Consumer) Consume() {
    go func() {
        fmt.Printf("waiting for messages, queue = %s\n", c.queueName)
        for d := range c.delivery {
            // 处理消息
            fmt.Printf("[%s received]: %s\n", c.queueName, d.Body)
            // _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack
        }
    }()
}

// Close 关闭
func (c *Consumer) Close() {
    if c.ch != nil {
        _ = c.ch.Close()
    }
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}


延时类型消息队列的golang示例代码

rabbitMQ默认不支持延时消息队列类型,需要另外安装插件来实现,使用延时队列需要指定具体一种消息类型(direct、topic、fanout、headers),下面以direct类型的延时消息队列为例。

生产端示例代码

package main

import (
    "context"
    "fmt"
    "strconv"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

var (
    url          = "amqp://guest:guest@192.168.3.37:5672/"
    exchangeName = "delayed-message-exchange-demo"
)

func main() {
    conn, err := amqp.Dial(url)
    checkErr(err)
    defer conn.Close()

    ctx := context.Background()

    queueName := "delayed-message-queue"
    routingKey := "delayed-key"
    delayedMessageType := "direct"
    exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey)
    q, err := NewProducer(queueName, conn, exchange)
    checkErr(err)
    defer q.Close()
    for i := 1; i <= 5; i++ {
        body := time.Now().Format("2006-01-02 15:04:05.000") + " hello world " + strconv.Itoa(i)
        err = q.Publish(ctx, time.Second*5, []byte(body)) // 发送消息
        checkErr(err)
        time.Sleep(time.Second)
    }
}

// Exchange 交换机
type Exchange struct {
    Name                string // exchange名称
    Type                string // exchange类型,支持direct、topic、fanout、headers、x-delayed-message
    RoutingKey          string // 路由key
    XDelayedMessageType string // 延时消息类型,支持direct、topic、fanout、headers
}

// NewDelayedMessageExchange 实例化一个delayed-message类型交换机,参数delayedMessageType 消息类型direct、topic、fanout、headers
func NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange {
    return &Exchange{
        Name:                exchangeName,
        Type:                "x-delayed-message",
        RoutingKey:          routingKey,
        XDelayedMessageType: delayedMessageType,
    }
}

// Producer 生产者对象
type Producer struct {
    queueName string
    exchange  *Exchange
    conn      *amqp.Connection
    ch        *amqp.Channel
}

// NewProducer 实例化一个生产者
func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) {
    // 创建管道
    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    // 声明交换机类型
    err = ch.ExchangeDeclare(
        exchange.Name, // 交换机名称
        exchange.Type, //  x-delayed-message
        true,          // 是否持久化
        false,         // 是否自动删除
        false,         // 是否公开,false即公开
        false,         // 是否等待
        amqp.Table{
            "x-delayed-type": exchange.XDelayedMessageType, // 延时消息的类型direct、topic、fanout、headers
        },
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 声明队列,如果队列不存在则自动创建,存在则跳过创建
    q, err := ch.QueueDeclare(
        queueName, // 消息队列名称
        true,      // 是否持久化
        false,     // 是否自动删除
        false,     // 是否具有排他性(仅创建它的程序才可用)
        false,     // 是否阻塞处理
        nil,       // 额外的属性
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 绑定队列和交换机
    err = ch.QueueBind(
        q.Name,
        exchange.RoutingKey,
        exchange.Name,
        false,
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    return &Producer{
        queueName: queueName,
        conn:      conn,
        ch:        ch,
        exchange:  exchange,
    }, nil
}

// Publish 发送消息
func (p *Producer) Publish(ctx context.Context, delayTime time.Duration, body []byte) error {
    err := p.ch.PublishWithContext(
        ctx,
        p.exchange.Name,       // exchange name
        p.exchange.RoutingKey, // key
        false,                 // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者
        false,                 // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
        amqp.Publishing{
            DeliveryMode: amqp.Persistent, // 如果队列的声明是持久化的,那么消息也设置为持久化
            ContentType:  "text/plain",
            Body:         body,
            Headers: amqp.Table{
                "x-delay": int(delayTime / time.Millisecond), // 延迟时间: 毫秒
            },
        },
    )
    if err != nil {
        return err
    }
    fmt.Printf("[send]: %s\n", body)
    return nil
}

// Close 关闭生产者
func (p *Producer) Close() {
    if p.ch != nil {
        _ = p.ch.Close()
    }
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}


消费端示例代码

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

var (
    url          = "amqp://guest:guest@192.168.3.37:5672/"
    exchangeName = "delayed-message-exchange-demo"
)

func main() {
    conn, err := amqp.Dial(url)
    checkErr(err)
    defer conn.Close()

    ctx := context.Background()

    queueName := "delayed-message-queue"
    routingKey := "delayed-key"
    delayedMessageType := "direct"
    exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey)
    c, err := NewConsumer(ctx, queueName, exchange, conn)
    checkErr(err)
    c.Consume() // 消费消息
    defer c.Close()

    fmt.Println("exit press CTRL+C")
    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
    <-interrupt
    fmt.Println("exit consume messages")
}

// Exchange 交换机
type Exchange struct {
    Name                string // exchange名称
    Type                string // exchange类型,支持direct、topic、fanout、headers、x-delayed-message
    RoutingKey          string // 路由key
    XDelayedMessageType string // 延时消息类型,支持direct、topic、fanout、headers
}

// NewDelayedMessageExchange 实例化一个delayed-message类型交换机,参数delayedMessageType 消息类型direct、topic、fanout、headers
func NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange {
    return &Exchange{
        Name:                exchangeName,
        Type:                "x-delayed-message",
        RoutingKey:          routingKey,
        XDelayedMessageType: delayedMessageType,
    }
}

// Consumer 消费者
type Consumer struct {
    ctx       context.Context
    queueName string
    conn      *amqp.Connection
    ch        *amqp.Channel
    delivery  <-chan amqp.Delivery
    exchange  *Exchange
}

// NewConsumer 实例化一个消费者
func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) {
    // 创建管道
    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    // 声明交换机类型
    err = ch.ExchangeDeclare(
        exchange.Name, // 交换机名称
        exchange.Type, // 交换机的类型,支持direct、topic、fanout、headers
        true,          // 是否持久化
        false,         // 是否自动删除
        false,         // 是否公开,false即公开
        false,         // 是否等待
        amqp.Table{
            "x-delayed-type": exchange.XDelayedMessageType, // 延时消息的类型direct、topic、fanout、headers
        },
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 声明队列,如果队列不存在则自动创建,存在则跳过创建
    q, err := ch.QueueDeclare(
        queueName, // 消息队列名称
        true,      // 是否持久化
        false,     // 是否自动删除
        false,     // 是否具有排他性(仅创建它的程序才可用)
        false,     // 是否阻塞处理
        nil,       // 额外的属性
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 绑定队列和交换机
    err = ch.QueueBind(
        q.Name,
        exchange.RoutingKey,
        exchange.Name,
        false,
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 为消息队列注册消费者
    delivery, err := ch.ConsumeWithContext(
        ctx,
        queueName, // queue 名称
        "",        // consumer 用来区分多个消费者
        true,      // auto-ack 是否自动应答
        false,     // exclusive 是否独有
        false,     // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
        false,     // no-wait 是否阻塞
        nil,       // args
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    return &Consumer{
        queueName: queueName,
        conn:      conn,
        ch:        ch,
        delivery:  delivery,
        exchange:  exchange,
    }, nil
}

// Consume 接收消息
func (c *Consumer) Consume() {
    go func() {
        fmt.Printf("waiting for messages, type=%s, queue=%s, key=%s\n", c.exchange.Type, c.queueName, c.exchange.RoutingKey)
        for d := range c.delivery {
            // 处理消息
            fmt.Printf("%s %s [received]: %s\n", time.Now().Format("2006-01-02 15:04:05.000"), c.exchange.RoutingKey, d.Body)
            // _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack
        }
    }()
}

// Close 关闭
func (c *Consumer) Close() {
    if c.ch != nil {
        _ = c.ch.Close()
    }
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}


发布订阅的golang示例代码

发布订阅是在fanout消息类型基础上实现的。

发布端示例代码

package main

import (
    "context"
    "fmt"
    "strconv"

    amqp "github.com/rabbitmq/amqp091-go"
)

var (
    url          = "amqp://guest:guest@192.168.3.37:5672/"
    exchangeName = "pub-sub"
)

func main() {
    // 连接rabbitmq
    conn, err := amqp.Dial(url)
    checkErr(err)
    defer conn.Close()

    // 实例化一个发布者
    p, err := NewPublisher(exchangeName, conn)
    checkErr(err)
    defer p.Close()

    ctx := context.Background()

    // 发送消息
    for i := 1; i <= 10; i++ {
        err = p.Publish(ctx, []byte("hello world "+strconv.Itoa(i)))
        checkErr(err)
    }
}

// Publisher 发布者
type Publisher struct {
    exchangeName string
    conn         *amqp.Connection
    ch           *amqp.Channel
}

// NewPublisher 实例化一个发布者
func NewPublisher(exchangeName string, conn *amqp.Connection) (*Publisher, error) {
    // 创建管道
    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    // 声明交换机类型
    err = ch.ExchangeDeclare(
        exchangeName, // 交换机名称
        "fanout",     // 交换机的类型
        true,         // 是否持久化
        false,        // 是否自动删除
        false,        // 是否公开,false即公开
        false,        // 是否等待
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    return &Publisher{
        exchangeName: exchangeName,
        conn:         conn,
        ch:           ch,
    }, nil
}

func (p *Publisher) Publish(ctx context.Context, body []byte) error {
    err := p.ch.PublishWithContext(
        ctx,
        p.exchangeName, // exchange name
        "",             // 消息类型为fanout,此参数无效
        false,          // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者
        false,          // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        body,
        },
    )
    if err != nil {
        return err
    }
    fmt.Printf("[send]: %s\n", body)
    return nil
}

// Close 关闭生产者
func (p *Publisher) Close() {
    if p.ch != nil {
        _ = p.ch.Close()
    }
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}


订阅端示例代码

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"

    amqp "github.com/rabbitmq/amqp091-go"
)

var (
    url          = "amqp://guest:guest@192.168.3.37:5672/"
    exchangeName = "pub-sub"
)

func main() {
    conn, err := amqp.Dial(url)
    checkErr(err)
    defer conn.Close()

    ctx := context.Background()

    queueName1 := "pub-sub-queue-1"
    s1, err := NewSubscriber(ctx, exchangeName, queueName1, conn)
    checkErr(err)
    s1.Subscribe() // 消费信息
    defer s1.Close()

    queueName2 := "pub-sub-queue-2"
    s2, err := NewSubscriber(ctx, exchangeName, queueName2, conn)
    checkErr(err)
    s2.Subscribe() // 消费信息
    defer s2.Close()

    fmt.Println("exit press CTRL+C")
    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
    <-interrupt
    fmt.Println("finished receiving messages")
}

// Subscriber 订阅者
type Subscriber struct {
    ctx          context.Context
    exchangeName string
    queueName    string
    conn         *amqp.Connection
    ch           *amqp.Channel
    delivery     <-chan amqp.Delivery
}

// NewSubscriber 实例化一个订阅者
func NewSubscriber(ctx context.Context, exchangeName string, queueName string, conn *amqp.Connection) (*Subscriber, error) {
    // 创建管道
    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    // 声明交换机类型
    err = ch.ExchangeDeclare(
        exchangeName, // 交换机名称
        "fanout",     // 交换机的类型
        true,         // 是否持久化
        false,        // 是否自动删除
        false,        // 是否公开,false即公开
        false,        // 是否等待
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 声明队列,如果队列不存在则自动创建,存在则跳过创建
    q, err := ch.QueueDeclare(
        queueName, // 消息队列名称
        true,      // 是否持久化
        false,     // 是否自动删除
        false,     // 是否具有排他性(仅创建它的程序才可用)
        false,     // 是否阻塞处理
        nil,       // 额外的属性
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 绑定队列和交换机
    err = ch.QueueBind(
        q.Name,       // 队列名称
        "",           // 消息类型为fanout时无效
        exchangeName, // 交换机名称
        false,
        nil,
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    // 为消息队列注册消费者
    delivery, err := ch.ConsumeWithContext(
        ctx,
        queueName, // queue 名称
        "",        // consumer 用来区分多个消费者
        true,      // auto-ack 是否自动应答
        false,     // exclusive 是否独有
        false,     // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
        false,     // no-wait 是否阻塞
        nil,       // args
    )
    if err != nil {
        _ = ch.Close()
        return nil, err
    }

    return &Subscriber{
        ctx:          ctx,
        exchangeName: exchangeName,
        queueName:    queueName,
        conn:         conn,
        ch:           ch,
        delivery:     delivery,
    }, nil
}

// Subscribe 订阅
func (c *Subscriber) Subscribe() {
    go func() {
        fmt.Printf("waiting for messages, queue = %s\n", c.queueName)
        for d := range c.delivery {
            // 处理消息
            fmt.Printf("[%s received]: %s\n", c.queueName, d.Body)
            // _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack
        }
    }()
}

// Close 关闭
func (p *Subscriber) Close() {
    if p.ch != nil {
        _ = p.ch.Close()
    }
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}


总结

上面介绍了rabbitMQ各个类型消息队列的简单使用,在实际使用中,连接rabbitMQ应该有网络断开重连功能。如果消费端处理消息比较慢,在消费端设置channel.Qos来限制每次消费消息的数量,平衡消息吞吐量和公平性,防止消费者受到消息突发流量冲击。例如设置prefetch_count=1,则表示每个消费者每次只会处理1条消息,处理完成后才会获取下一条消息。这可以防止少数消费者处理能力弱导致大量消息堆积。 适当地设置这些参数,可以优化rabbitMQ在大量消息场景下的性能表现。

这是在github.com/rabbitmq/amqp091-go基础上封装的 rabbitmq 库。


参考:



专题「消息中间件」的其它文章 »