【c++中间件】RabbitMQ介绍 && AMQP-CPP库的使用 && 二次封装

作者:利刃大大日期:2025/11/28

文章目录

  • Ⅰ. 安装 RabbitMQ
    • RabbitMQ 服务安装与使用
    • 安装 RabbitMQ 的 C++ 客户端库
      • 安装 AMQP-CPP
  • Ⅱ. RabbitMQ 的介绍
      • RabbitMQ 服务与客户端的通信原理
        • 简单通信流程
  • Ⅲ. AMQP-CPP 库的简单使用
    • 一、介绍
    • 二、使用
    • 三、常用类与接口介绍
      • Channel 类
        • libev
    • 使用样例
  • Ⅳ. 二次封装
      • 测试代码

在这里插入图片描述

Ⅰ. 安装 RabbitMQ

RabbitMQ 服务安装与使用

1sudo apt install -y rabbitmq-server
2

​ 一般来说默认端口为 15672

1# 启动服务
2sudo systemctl start rabbitmq-server.service
3# 查看服务状态
4sudo systemctl status rabbitmq-server.service
5
6# 安装完成的时候默认有个用户 guest ,但是权限不够,要创建一个 administrator 用户,才可以做为远程登录和发表订阅消息:
7 # 添加用户和密码
8sudo rabbitmqctl add_user root 123456
9 # 设置用户tag
10sudo rabbitmqctl set_user_tags root administrator
11 # 设置用户权限
12sudo rabbitmqctl set_permissions -p / root "." "." ".*"
13
14# RabbitMQ 自带了 web 管理界面,执行下面命令开启
15sudo rabbitmq-plugins enable rabbitmq_management
16

​ 网页访问的默认端口为 15672:(注意 rabbitmq 服务的端口是 5672,注意区分!

在这里插入图片描述

安装 RabbitMQ 的 C++ 客户端库

​ 我们这里使用 AMQP-CPP 库来编写客户端程序!

安装 AMQP-CPP

1sudo apt install -y libev-dev #libev 网络库组件
2git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
3cd AMQP-CPP/
4make 
5sudo make install
6

​ 如果安装时候出现了 SSL 相关的版本问题,可以通过以下命令来解决:

1dpkg -l |grep ssl
2sudo dpkg -P --force-all libevent-openssl-2.1-7
3sudo dpkg -P --force-all openssl
4sudo dpkg -P --force-all libssl-dev 
5sudo apt --fix-broken install
6

​ 然后重新 make 即可!

Ⅱ. RabbitMQ 的介绍

RabbitMQ 是一个基于 AMQPAdvanced Message Queuing Protocol)的开源消息代理,广泛用于消息传递和消息队列管理。其核心功能是 提供异步的消息传递机制,支持多个消费者和生产者之间的松耦合通信

RabbitMQ 服务与客户端的通信原理

  1. 连接建立(Connection)
  • 客户端通过 AMQP 协议与 RabbitMQ 服务器建立连接。RabbitMQ 使用 TCP/IP 作为传输协议,默认端口为 5672
  • 客户端首先创建一个与 RabbitMQTCP 连接,这个连接是双向的,可以承载多个 通道Channel)。
  1. 通道(Channel)
  • 在一个 TCP 连接上,客户端和 RabbitMQ 可以开辟多个通道。每个通道都是一个独立的、轻量级的通信渠道,用于进行消息的发送与接收。
  • 每个通道都对应一个 AMQP 会话,所有操作(如声明队列、发布消息、消费消息等)都通过通道进行。
  • 这样做的好处是,一个 TCP 连接可以并发处理多个请求,减少了连接的开销和延迟。
  1. 声明队列(Queue)
  • 在客户端和 RabbitMQ 服务器之间进行消息传递前,客户端需要声明一个队列。队列是消息的存储位置,消息会被传递到指定的队列中。
  • 队列是由生产者声明的,消费者会从队列中获取消息!
  • 也可以通过 DurabilityPersistence 来确保队列和消息在 RabbitMQ 服务器重启后不丢失。
  1. 交换机(Exchange)
  • 交换机是 RabbitMQ 中的核心组件,负责根据路由规则将消息分发到适当的队列。客户端将消息发送到交换机,而交换机会根据绑定的路由键(routing key)将消息路由到对应的队列。
  • RabbitMQ 支持几种类型的交换机:
    • Direct Exchange:根据路由键一对一将消息发送到队列。
    • Fanout Exchange:将消息广播到所有绑定到它的队列。
    • Topic Exchange:根据路由键模式(类似正则表达式)将消息路由到一个或多个队列。
    • Headers Exchange:通过消息头部的属性来决定消息的路由。
  1. 发布消息(Publishing)
  • 生产者(Producer:生产者将消息发布到交换机。生产者通常不会直接与队列通信,而是将消息发送到交换机,交换机决定消息的路由。
  • 消息可以携带属性,如 routing key(路由键),用于帮助交换机决定消息的路由方向。
  • 如果消息发布时队列未创建,生产者可以要求 RabbitMQ 自动创建队列,或自己事先声明队列。
  1. 消息消费(Consuming)
  • 消费者(Consumer:消费者订阅队列,从队列中获取消息并进行处理。一个队列可以有多个消费者,RabbitMQ 会轮流将消息分发给所有绑定的消费者。
  • 当消息被消费者成功消费后,可以根据消息的 确认机制 来决定是否从队列中删除消息。
  • 消费者可以使用两种主要的确认方式:
    • 自动确认:消费者在获取消息后会自动确认消息已处理,RabbitMQ 会将该消息从队列中移除。
    • 手动确认:消费者处理完消息后需要明确调用确认方法,确保消息在处理完成后才会被移除。
  1. 消息确认(Acknowledgment)
  • 为了保证消息的可靠性,RabbitMQ 提供了消息确认机制。当消费者从队列中获取到消息后,可以选择确认消息,表示消息已成功处理。
  • 如果消费者在处理消息时出现异常,消息不会被确认,RabbitMQ 会将消息重新放入队列供其他消费者重新消费(也可以设置消息的重试次数和延迟时间)。
  1. 消息持久化(Persistence)
  • 队列持久化:在声明队列时,可以将其设置为持久化,确保即使 RabbitMQ 服务器重启,队列本身依然存在。
  • 消息持久化:可以通过将消息标记为持久化(delivery_mode=2)来确保消息在 RabbitMQ 重启后不丢失。
  1. 消息路由(Routing)
  • 交换机根据绑定到它的队列和路由键,决定如何将消息路由到一个或多个队列。
  • 如果消息没有找到匹配的队列,通常情况下,消息会被丢弃,除非启用了死信队列 DLQDead Letter Queue)来接收无法路由的消息。

简单通信流程

在这里插入图片描述

  1. 生产者 将消息发送到指定的 交换机,消息携带一个路由键。
  2. 交换机 根据路由规则将消息发送到一个或多个队列。
  3. 消费者 监听并消费从队列中获取到的消息,进行处理。
  4. 消费者处理完消息后,通过确认机制告知 RabbitMQ 消息已处理。
  5. RabbitMQ 根据确认机制和消息持久化设置,确保消息不会丢失。

Ⅲ. AMQP-CPP 库的简单使用

一、介绍

AMQP-CPP 是一个用于实现 AMQP (Advanced Message Queuing Protocol) 协议的 C++ 库。AMQP 是一种广泛应用于消息中间件的开放标准协议,支持在分布式系统中高效、安全地交换消息。AMQP-CPP 库使得 C++ 开发者能够轻松实现 AMQP 协议的客户端和服务器,通常用于与 RabbitMQ 等消息中间件进行交互。

AMQP-CPP 完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中。

​ 注意事项:这个库需要 c++17 的支持

​ 主要功能有:

  • 连接和通道管理AMQP-CPP 支持通过 AMQP 协议连接到 RabbitMQ 服务器,并能够创建多个通道进行并发消息处理。
  • 消息发布与订阅:可以方便地进行消息发布(生产者)和订阅(消费者),支持不同类型的交换机(DirectTopicFanout 等)和队列。
  • 队列管理:支持创建、删除、绑定和监听队列。
  • 异常处理和重试机制:内置异常处理,能够在连接断开后自动进行重试,确保消息通信的稳定性。

二、使用

通常 AMQP-CPP 的使用有两种模式:

  1. 使用默认的 TCP 模块进行网络通信(相当于自己手动实现,比较麻烦!)
    • 实现一个类继承自 AMQP::TcpHandler 类, 它负责网络层的 TCP 连接
    • 重写相关函数, 其中必须重写 monitor 函数
    • monitor 函数中需要实现的是将 fd 放入 eventloopselectepoll)中监控, 当 fd 可写可读就绪之后, 调用 AMQP-CPPconnection->process(fd, flags) 方法
  2. 使用扩展的 libeventlibevlibuvasio 异步通信组件进行通信(相当于有现成的组件供我们直接使用,我们优先用这种模式!
    • libev 为例, 我们不必要自己实现 monitor 函数, 可以直接使用 AMQP::LibEvHandler

​ 在本项目编译链接的时候,记得链接动态库:-lamqpcpp -lev

三、常用类与接口介绍

Channel 类

channel 是一个虚拟连接,一个连接上可以建立多个通道。并且所有的 RabbitMq 指令都是通过 channel 传输,所以连接建立后的第一步,就是建立 channel

​ 因为所有操作是异步的,所以在 channel 上执行指令的返回值并不能作为操作执行结果,实际上它返回的是 Deferred 类,可以使用它安装处理函数。

1namespace AMQP {
2    /**
3     * 通用回调函数类型,用于许多延迟处理对象
4     */
5    using SuccessCallback = std::function<void()>;  			    // 成功回调函数类型,无参数
6    using ErrorCallback = std::function<void(const char *message)>;  // 错误回调函数类型,带错误信息
7    using FinalizeCallback = std::function<void()>;  			    // 完成回调函数类型,无参数
8
9    // 队列回调,返回队列名称、消息数和消费者数
10    using QueueCallback = std::function<void(const std::string &name, 
11                                             uint32_t messagecount, 
12                                             uint32_t consumercount)>;  
13    // 删除队列回调,返回删除的消息数
14    using DeleteCallback = std::function<void(uint32_t deletedmessages)>; 
15	
16    // 消息接收回调,返回消息对象、交付标签和是否重新投递的标志
17    using MessageCallback = std::function<void(const Message &message, 
18                                               uint64_t deliveryTag, 
19                                               bool redelivered)>;  
20    /**
21     * 发布者确认回调,返回消息交付标签和是否是多个消息
22     * 当服务器确认消息已被接收和处理时,触发此回调
23     */
24    using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;  
25
26    /**
27     * 使用确认包裹通道时,当消息被 ack/nacked 时,调用这些回调
28     */
29    using PublishAckCallback = std::function<void()>;   // 发布确认回调
30    using PublishNackCallback = std::function<void()>;  // 发布未确认回调
31    using PublishLostCallback = std::function<void()>;  // 发布消息丢失回调
32
33    /**
34     * Channel 类定义
35     * 管理与 RabbitMQ 服务器之间的消息通道
36     */
37    class Channel {
38    public:
39        // 构造函数,创建一个通道,并与指定连接绑定
40        Channel(Connection *connection);
41
42        // 检查通道是否已经连接
43        bool connected();
44
45        /**
46         * 声明交换机
47         * 如果交换机名称为空,服务器会自动分配一个名称
48         * 支持的交换机类型:fanout(广播)、direct(直接)、topic(主题)、headers、consistent_hash、message_deduplication
49         * 以下 flags 可用于交换机:
50         *	 -durable 持久化,重启后交换机依然有效
51         *	 -autodelete 删除所有连接的队列后,自动删除交换
52         *	 -passive 仅被动检查交换机是否存在
53         *	 -internal 创建内部交换
54         * 此函数返回一个延迟处理程序,可以安装回调 using onSuccess(), onError()  onFinalize()
55         * 这里我们只用到 onError(),如下所示:
56         *   Deferred &onError(const char *message)
57 		*/
58        Deferred &declareExchange(
59            const std::string_view &name,  // 交换机名称
60            ExchangeType type,  // 交换机类型
61            int flags,  // 交换机标志
62            const Table &arguments  // 其他参数
63        );
64
65        /**
66         * 声明队列
67         * 如果队列名称为空,服务器会自动分配一个名称
68         * flags 可以是以下值的组合:
69         *	 -durable 持久队列在代理重新启动后仍然有效
70         *	 -autodelete 当所有连接的使用者都离开时,自动删除队列
71         *	 -passive 仅被动检查队列是否存在
72         *	 -exclusive 队列仅存在于此连接,并且在连接断开时自动删除
73         *
74         * 可以安装的 onSuccess()回调应该具有以下签名:
75             void myCallback(const std::string &name, 
76             				uint32_t messageCount, 
77             				uint32_t consumerCount);
78             例如:
79             channel.declareQueue("myqueue").onSuccess(
80             		[](const std::string &name, 
81             		   uint32_t messageCount,
82             		   uint32_t consumerCount) {
83             	std::cout << "Queue '" << name << "' ";
84             	std::cout << "has been declared with ";
85             	std::cout << messageCount;
86                 std::cout << " messages and ";
87                 std::cout << consumerCount;
88                 std::cout << " consumers" << std::endl;
89             });
90         */
91        DeferredQueue &declareQueue(
92            const std::string_view &name,  // 队列名称
93            int flags,  // 队列标志
94            const Table &arguments  // 其他参数
95        );
96
97        /**
98         * 将队列绑定到交换机
99         * 参数:交换机名称、队列名称、路由密钥以及其他绑定参数
100         */
101        Deferred &bindQueue(
102            const std::string_view &exchange,    // 交换机名称
103            const std::string_view &queue,     	 // 队列名称
104            const std::string_view &routingkey,  // 路由密钥
105            const Table &arguments  // 其他绑定参数
106        );
107
108        /**
109         * 将消息发布到交换机
110         * 必须提供交换机的名称和路由密钥,然后 RabbitMQ 会将消息发送到符合条件的队列
111         * 可选的 flags 参数可以指定如果消息无法路由到队列时应该发生的情况。默认情况下,不可更改的消息将被静默地丢弃。
112         * 如果设置了 mandatory  immediate 标志,则无法处理的消息将返回到应用程序。
113         * 在开始发布之前,请确保已经调用了 recall() 方法,并设置了所有适当的处理程序来处理这些返回的消息。
114         * 可以提供以下 flags:
115         *   -mandatory 如果设置,服务器将返回未发送到队列的消息
116         *   -immediate 如果设置,服务器将返回无法立即转发给使用者的消息。
117         */
118        bool publish(
119            const std::string_view &exchange,    // 交换机名称
120            const std::string_view &routingKey,  // 路由密钥
121            const std::string &message,  // 消息内容
122            int flags = 0  // 可选标志
123        );
124
125        /**
126         * 告诉 RabbitMQ 服务器我们已经准备好开始消费队列消息
127         * 如果未指定消费者标签,服务器会自动分配一个
128         * 支持以下 flags: 
129         *   -nolocal 如果设置了,则不会同时消耗在此通道上发布的消息
130         *   -noack 如果设置了,则不必对已消费的消息进行确认
131         *   -exclusive 请求独占访问,只有此使用者可以访问队列
132         
133         * 可以安装的 onSuccess()回调应该具有以下格式:
134         	 void myCallback(const std::string_view&tag);
135         样例:
136             channel.consume("myqueue").onSuccess([](const std::string_view& tag) {
137             		std::cout << "Started consuming under tag ";
138             		std::cout << tag << std::endl;
139             });
140         */
141        DeferredConsumer &consume(
142            const std::string_view &queue,  // 队列名称
143            const std::string_view &tag,    // 消费者标签
144            int flags,  // 消费者标志
145            const Table &arguments  // 其他参数
146        );
147
148        /**
149         * 确认接收到的消息
150         * 消费者需要确认收到消息,告知 RabbitMQ 将消息从队列中移除
151         * 支持多条确认:之前所有未确认的消息将一并确认
152         * 当在 DeferredConsumer::onReceived() 方法中接收到消息时,必须确认该消息,以便 RabbitMQ 将其从队列中删除(除非使用 noack 选项消费)。
153         */
154        bool ack(uint64_t deliveryTag, int flags = 0);  // 确认消息,返回是否成功
155    };
156
157    /**
158     * DeferredConsumer 类定义
159     * 管理延迟消费者,支持注册回调函数处理不同的事件
160     */
161    class DeferredConsumer {
162    public:
163        // 注册消费者启动时的回调函数
164        DeferredConsumer &onSuccess(const ConsumeCallback& callback);
165
166        // 注册接收到消息时的回调函数
167        DeferredConsumer &onReceived(const MessageCallback& callback);
168
169        // 注册接收到消息的别名
170        DeferredConsumer &onMessage(const MessageCallback& callback);
171
172        // 注册服务器取消消费者时的回调函数
173        DeferredConsumer &onCancelled(const CancelCallback& callback);
174    };
175
176    /**
177     * 消息类,代表通过 RabbitMQ 发送的消息
178     */
179    class Message : public Envelope {
180    public:
181        const std::string &exchange();    // 获取消息的交换机
182        const std::string &routingkey();  // 获取消息的路由键
183    };
184
185    /**
186     * 信封类,代表包含元数据和消息体的结构
187     */
188    class Envelope : public MetaData {
189    public:
190        const char *body();   // 获取消息体内容(没有以'\0'结尾,所以获取时候需要结合下面的bodySize()接口)
191        uint64_t bodySize();  // 获取消息体的大小
192    };
193}
194

libev

libev 是一个高效的事件循环库,广泛用于事件驱动的编程模型中。它提供了一个轻量级、低延迟的事件循环实现,能够高效地处理 I/O 操作、定时器和其他异步事件。比如:

  • ev_loop:事件循环对象,管理和调度事件
  • ev_async:用于处理异步事件的结构体,通常用于线程间的异步通知
  • ev_run:启动事件循环
  • ev_break:退出事件循环
1// 定义 ev_async 结构体,表示异步事件
2typedef struct ev_async {
3    EV_WATCHER(ev_async)  // 宏,标记此结构体为事件观察者(watcher)
4    EV_ATOMIC_T sent;     // 标记是否发送过异步事件,通常是一个原子操作标志
5} ev_async;
6
7// 事件循环的中断类型枚举
8enum {
9    EVBREAK_CANCEL = 0,   // 取消循环,撤销unloop操作
10    EVBREAK_ONE = 1,      // 退出一次循环,只停止当前事件循环
11    EVBREAK_ALL = 2       // 退出所有事件循环,完全停止
12};
13
14// 默认事件循环初始化函数
15// flags 参数可以指定额外的标志,默认值为 0
16// EV_CPP 是用于 C++ 环境的一个宏,通常设为0表示默认行为
17struct ev_loop *ev_default_loop(unsigned int flags EV_CPP (= 0));
18
19// 宏定义,表示初始化默认的事件循环
20# define EV_DEFAULT ev_default_loop(0)
21
22// 启动事件循环,loop 是事件循环对象
23// 事件循环将监听并处理所有注册的事件和回调
24int ev_run(struct ev_loop *loop);
25
26// 退出事件循环
27// break_type 表示退出的类型,控制如何退出循环
28//   -EVBREAK_CANCEL: 撤销之前的 unloop 操作
29//   -EVBREAK_ONE: 停止一次循环
30//   -EVBREAK_ALL: 停止所有循环,彻底退出
31void ev_break(struct ev_loop *loop, int32_t break_type);
32
33// 异步事件回调函数的类型定义
34// 该回调函数会在事件触发时被调用
35void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
36
37// 初始化一个 ev_async 结构体,并设置其回调函数
38// 当事件触发时,ev_async 的回调函数会被调用
39void ev_async_init(ev_async *w, callback cb);
40
41// 将异步事件 watcher 添加到事件循环 loop 中开始监听并等待触发
42// 该事件会被添加到事件循环中,
43void ev_async_start(struct ev_loop *loop, ev_async *w);
44
45// 发送一个异步事件到事件循环中,通常用来唤醒事件循环中的等待状态
46// 一旦事件被发送,事件循环将处理该异步事件
47void ev_async_send(struct ev_loop *loop, ev_async *w);
48

使用样例

publish.cc 文件:

1#include <ev.h>
2#include <amqpcpp.h>
3#include <amqpcpp/libev.h>
4#include <openssl/ssl.h>
5#include <openssl/opensslv.h>
6
7int main()
8{
9    // 1. 实例化底层网络通信框架的I/O事件监控句柄
10    auto *loop = EV_DEFAULT;
11
12    // 2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来
13    AMQP::LibEvHandler handler(loop);
14
15    // 2.5 实例化连接对象
16    AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
17    AMQP::TcpConnection connection(&handler, address);
18
19    // 3. 实例化信道对象
20    AMQP::TcpChannel channel(&connection);
21
22    // 4. 声明交换机
23    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct) // 一对一消息传递模式
24        .onError([](const char *message) {
25            std::cout << "声明交换机失败:" << message << std::endl;
26            exit(0);
27        })
28        .onSuccess([](){
29            std::cout << "test-exchange 交换机创建成功!" << std::endl;
30        });
31
32    // 5. 声明队列
33    channel.declareQueue("test-queue")
34        .onError([](const char *message) {
35            std::cout << "声明队列失败:" << message << std::endl;
36            exit(0);
37        })
38        .onSuccess([](){
39            std::cout << "test-queue 队列创建成功!" << std::endl;
40        });
41
42    // 6. 针对交换机和队列进行绑定
43    channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
44        .onError([](const char *message) {
45            std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
46            exit(0);
47        })
48        .onSuccess([](){
49            std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;
50        });
51
52    // 7. 向交换机发布消息
53    for (int i = 0; i < 10; i++) {
54        std::string msg = "Hello lirendada-" + std::to_string(i);
55        bool ret = channel.publish("test-exchange", "test-queue-key", msg);
56        if (ret == false) {
57            std::cout << "publish 失败!\n";
58        }
59    }
60
61    // 启动底层网络通信框架--开启I/O
62    ev_run(loop, 0);
63    return 0;
64}
65

consume.cc 文件:

1#include <ev.h>
2#include <amqpcpp.h>
3#include <amqpcpp/libev.h>
4#include <openssl/ssl.h>
5#include <openssl/opensslv.h>
6
7//消息回调处理函数的实现
8void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
9{
10    std::string msg;
11    msg.assign(message.body(), message.bodySize());
12    std::cout << msg << std::endl;
13    channel->ack(deliveryTag); // 对消息进行确认
14}
15
16int main()
17{
18    // 1. 实例化底层网络通信框架的I/O事件监控句柄
19    auto *loop = EV_DEFAULT;
20
21    // 2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来
22    AMQP::LibEvHandler handler(loop);
23
24    // 2.5 实例化连接对象
25    AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
26    AMQP::TcpConnection connection(&handler, address);
27
28    // 3. 实例化信道对象
29    AMQP::TcpChannel channel(&connection);
30
31    // 4. 声明交换机
32    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
33        .onError([](const char *message) {
34            std::cout << "声明交换机失败:" << message << std::endl;
35            exit(0);
36        })
37        .onSuccess([](){
38            std::cout << "test-exchange 交换机创建成功!" << std::endl;
39        });
40
41    // 5. 声明队列
42    channel.declareQueue("test-queue")
43        .onError([](const char *message) {
44            std::cout << "声明队列失败:" << message << std::endl;
45            exit(0);
46        })
47        .onSuccess([](){
48            std::cout << "test-queue 队列创建成功!" << std::endl;
49        });
50
51    // 6. 针对交换机和队列进行绑定
52    channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
53        .onError([](const char *message) {
54            std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
55            exit(0);
56        })
57        .onSuccess([](){
58            std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;
59        });
60
61    // 7. 订阅队列消息 -- 设置消息处理回调函数
62    auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
63    channel.consume("test-queue", "consume-tag")  //返回值 DeferredConsumer
64        .onReceived(callback)
65        .onError([](const char *message){
66            std::cout << "订阅 test-queue 队列消息失败:" << message << std::endl;
67            exit(0);
68        }); // 返回值是 AMQP::Deferred
69    
70    // 8. 启动底层网络通信框架--开启I/O
71    ev_run(loop, 0);
72    return 0;
73}
74

makefile 文件:

1all : publish consume
2publish : publish.cc
3	g++ -std=c++17 $^ -o $@ -lamqpcpp -lev
4consume : consume.cc
5	g++ -std=c++17 $^ -o $@ -lamqpcpp -lev
6

Ⅳ. 二次封装

​ 在项目中使用 rabbitmq 的时候,我们目前只需要交换机与队列的直接交换,实现一台主机将消息发布给另一台主机进行处理的功能,因此在这里可以对 rabbitmq 的操作进行简单的封装,使 rabbitmq 的操作在项目中更加简便:

封装一个 MQClient

  • 提供声明和绑定指定交换机与队列的功能
  • 提供向指定交换机发布消息的功能
  • 提供订阅指定队列消息,并设置回调函数进行消息消费处理的功能
1// rabbitmq.hpp
2#pragma once
3#include <ev.h>
4#include <amqpcpp.h>
5#include <amqpcpp/libev.h>
6#include <openssl/ssl.h>
7#include <openssl/opensslv.h>
8#include <iostream>
9#include <functional>
10#include "logger.hpp"
11
12class MQClient 
13{
14public:
15    using ptr = std::shared_ptr<MQClient>;
16    using MessageCallback = std::function<void(const char*, size_t)>; // 定义消息回调类型
17
18    // 构造函数,初始化连接信息,连接到AMQP服务器
19    MQClient(const std::string &user, 
20             const std::string passwd,
21             const std::string host) 
22    {
23        // 初始化Libev事件循环
24        _loop = EV_DEFAULT;
25        // 创建AMQP的LibEv事件循环处理器
26        _handler = std::make_unique<AMQP::LibEvHandler>(_loop);
27
28        // 构建AMQP连接地址(例如:amqp://root:123456@127.0.0.1:5672/)
29        std::string url = "amqp://" + user + ":" + passwd + "@" + host + "/";
30        AMQP::Address address(url);
31
32        // 创建TCP连接
33        _connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address);
34        // 创建TCP通道
35        _channel = std::make_unique<AMQP::TcpChannel>(_connection.get());
36
37        // 启动一个新线程,运行事件循环
38        _loop_thread = std::thread([this]() {
39            ev_run(_loop, 0);
40        });
41    }
42
43    // 析构函数,关闭事件循环并清理资源
44    ~MQClient() 
45    {
46            // 初始化异步事件
47            ev_async_init(&_async_watcher, watcher_callback);
48            ev_async_start(_loop, &_async_watcher);
49            ev_async_send(_loop, &_async_watcher);
50
51            // 等待事件循环线程结束
52            _loop_thread.join();
53            _loop = nullptr;
54    }
55
56    // 声明交换机、队列及绑定
57    void declareComponents(const std::string &exchange, // 交换机名称
58                           const std::string &queue,    // 队列名称
59                           const std::string &routing_key = "routing_key",
60                           AMQP::ExchangeType echange_type = AMQP::ExchangeType::direct) // 一对一模式
61    {
62        // 声明交换机
63        _channel->declareExchange(exchange, echange_type)
64            .onError([](const char *message) {
65                LOG_ERROR("声明交换机失败:{}", message);
66                exit(0);
67            })
68            .onSuccess([exchange](){
69                LOG_ERROR("{} 交换机创建成功!", exchange);
70            });
71
72        // 声明队列
73        _channel->declareQueue(queue)
74            .onError([](const char *message) {
75                LOG_ERROR("声明队列失败:{}", message);
76                exit(0);
77            })
78            .onSuccess([queue](){
79                LOG_ERROR("{} 队列创建成功!", queue);
80            });
81
82        // 绑定交换机与队列
83        _channel->bindQueue(exchange, queue, routing_key)
84            .onError([exchange, queue](const char *message) {
85                LOG_ERROR("{} - {} 绑定失败:", exchange, queue);
86                exit(0);
87            })
88            .onSuccess([exchange, queue, routing_key](){
89                LOG_ERROR("{} - {} - {} 绑定成功!", exchange, queue, routing_key);
90            });
91    }
92
93    // 发布消息到指定交换机
94    bool publish(const std::string &exchange, 
95                 const std::string &msg, 
96                 const std::string &routing_key = "routing_key")
97    {
98        LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key);
99        bool ret = _channel->publish(exchange, routing_key, msg);
100        if (ret == false) {
101            LOG_ERROR("{} 发布消息失败:", exchange);
102            return false;
103        }
104        return true;
105    }
106
107    // 从指定队列消费消息
108    void consume(const std::string &queue, const MessageCallback &cb) 
109    {
110        LOG_DEBUG("开始订阅 {} 队列消息!", queue);
111        _channel->consume(queue, "consume-tag")  // 返回值为DeferredConsumer,而不是Deferred,所以要先用onReceived再用onError
112            .onReceived([this, cb](const AMQP::Message &message, 
113                                   uint64_t deliveryTag, 
114                                   bool redelivered) 
115            {
116                cb(message.body(), message.bodySize()); // 消息处理回调
117                _channel->ack(deliveryTag); // 确认消息处理
118            })
119            .onError([queue](const char *message){
120                LOG_ERROR("订阅 {} 队列消息失败: {}", queue, message);
121                exit(0);
122            });
123    }
124
125private:
126    // Libev事件循环异步回调
127    static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents) {
128        ev_break(loop, EVBREAK_ALL);  // 停止事件循环
129    }
130
131private:
132    struct ev_async _async_watcher;  // 异步事件监视器
133    struct ev_loop *_loop;           // Libev事件循环
134    std::unique_ptr<AMQP::LibEvHandler> _handler;      // Libev事件处理器
135    std::unique_ptr<AMQP::TcpConnection> _connection;  // TCP连接
136    std::unique_ptr<AMQP::TcpChannel> _channel;        // AMQP通道
137    std::thread _loop_thread;  // 事件循环线程
138};
139

测试代码

publish.cc 文件:

1#include "../../../header/rabbitmq.hpp"
2#include "../../../header/logger.hpp"
3#include <gflags/gflags.h>
4
5DEFINE_string(user, "root", "rabbitmq访问用户名");
6DEFINE_string(pswd, "123456", "rabbitmq访问密码");
7DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port");
8
9DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
10DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
11DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");
12
13int main(int argc, char *argv[])
14{
15    google::ParseCommandLineFlags(&argc, &argv, true);
16    init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
17
18    // 创建客户端对象
19    MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host);
20
21    // 声明交换机、队列及绑定
22    client.declareComponents("test-exchange", "test-queue");
23
24    // 发布消息
25    for (int i = 0; i < 10; i++) 
26    {
27        std::string msg = "Hello lirendada-" + std::to_string(i);
28        bool ret = client.publish("test-exchange", msg);
29        if (ret == false) {
30            LOG_WARN("publish 失败!\n");
31        }
32    }
33    std::this_thread::sleep_for(std::chrono::seconds(3));
34    return 0;
35}
36

consume.cc 文件:

1#include "../../../header/rabbitmq.hpp"
2#include "../../../header/logger.hpp"
3#include <gflags/gflags.h>
4
5DEFINE_string(user, "root", "rabbitmq访问用户名");
6DEFINE_string(pswd, "123456", "rabbitmq访问密码");
7DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port");
8
9DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
10DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
11DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");
12
13void callback(const char *body, size_t sz)
14{
15    std::string msg;
16    msg.assign(body, sz);
17    LOG_DEBUG("{}", msg);
18}
19
20int main(int argc, char *argv[])
21{
22    google::ParseCommandLineFlags(&argc, &argv, true);
23    init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
24
25    // 创建客户端对象
26    MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host);
27
28    // 声明交换机、队列及绑定
29    client.declareComponents("test-exchange", "test-queue");
30
31    // 订阅消息
32    client.consume("test-queue", callback);
33    std::this_thread::sleep_for(std::chrono::seconds(60));
34    return 0;
35}
36
37

makefile 文件:

1all : publish consume
2publish : publish.cc
3	g++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags
4consume : consume.cc
5	g++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags
6

在这里插入图片描述


【c++中间件】RabbitMQ介绍 && AMQP-CPP库的使用 && 二次封装》 是转载文章,点击查看原文


相关推荐


首页编辑器站点地图

本站内容在 CC BY-SA 4.0 协议下发布

Copyright © 2025 XYZ博客

Powered by 聚合阅读