文章目录
- Ⅰ. 安装 RabbitMQ
-
- RabbitMQ 服务安装与使用
- 安装 RabbitMQ 的 C++ 客户端库
-
- 安装 AMQP-CPP
- Ⅱ. RabbitMQ 的介绍
-
-
- RabbitMQ 服务与客户端的通信原理
- 简单通信流程
- RabbitMQ 服务与客户端的通信原理
-
- Ⅲ. AMQP-CPP 库的简单使用
-
- 一、介绍
- 二、使用
- 三、常用类与接口介绍
-
- Channel 类
- libev
- Channel 类
- 使用样例
- Ⅳ. 二次封装
-
-
- 测试代码
-

Ⅰ. 安装 RabbitMQ
RabbitMQ 服务安装与使用
1sudo apt install -y rabbitmq-server 2
一般来说默认端口为 15672。
1# 启动服务 2sudo systemctl start rabbitmq-server.service 3# 查看服务状态 4sudo systemctl status rabbitmq-server.service 5 6# 安装完成的时候默认有个用户 guest ,但是权限不够,要创建一个 administrator 用户,才可以做为远程登录和发表订阅消息: 7 # 添加用户和密码 8sudo rabbitmqctl add_user root 123456 9 # 设置用户tag 10sudo rabbitmqctl set_user_tags root administrator 11 # 设置用户权限 12sudo rabbitmqctl set_permissions -p / root "." "." ".*" 13 14# RabbitMQ 自带了 web 管理界面,执行下面命令开启 15sudo rabbitmq-plugins enable rabbitmq_management 16
网页访问的默认端口为 15672:(注意 rabbitmq 服务的端口是 5672,注意区分!)

安装 RabbitMQ 的 C++ 客户端库
- C 语言库:https://github.com/alanxz/rabbitmq-c
- C++库: https://github.com/CopernicaMarketingSoftware/AMQP-CPP/tree/master
我们这里使用 AMQP-CPP 库来编写客户端程序!
安装 AMQP-CPP
1sudo apt install -y libev-dev #libev 网络库组件 2git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git 3cd AMQP-CPP/ 4make 5sudo make install 6
如果安装时候出现了
SSL相关的版本问题,可以通过以下命令来解决:1dpkg -l |grep ssl 2sudo dpkg -P --force-all libevent-openssl-2.1-7 3sudo dpkg -P --force-all openssl 4sudo dpkg -P --force-all libssl-dev 5sudo apt --fix-broken install 6 然后重新
make即可!
Ⅱ. RabbitMQ 的介绍
RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol)的开源消息代理,广泛用于消息传递和消息队列管理。其核心功能是 提供异步的消息传递机制,支持多个消费者和生产者之间的松耦合通信。
RabbitMQ 服务与客户端的通信原理
- 连接建立(Connection)
- 客户端通过
AMQP协议与RabbitMQ服务器建立连接。RabbitMQ使用TCP/IP作为传输协议,默认端口为 5672。 - 客户端首先创建一个与
RabbitMQ的TCP连接,这个连接是双向的,可以承载多个 通道(Channel)。
- 通道(Channel)
- 在一个
TCP连接上,客户端和RabbitMQ可以开辟多个通道。每个通道都是一个独立的、轻量级的通信渠道,用于进行消息的发送与接收。 - 每个通道都对应一个
AMQP会话,所有操作(如声明队列、发布消息、消费消息等)都通过通道进行。 - 这样做的好处是,一个
TCP连接可以并发处理多个请求,减少了连接的开销和延迟。
- 声明队列(Queue)
- 在客户端和
RabbitMQ服务器之间进行消息传递前,客户端需要声明一个队列。队列是消息的存储位置,消息会被传递到指定的队列中。 - 队列是由生产者声明的,消费者会从队列中获取消息!
- 也可以通过
Durability和Persistence来确保队列和消息在RabbitMQ服务器重启后不丢失。
- 交换机(Exchange)
- 交换机是
RabbitMQ中的核心组件,负责根据路由规则将消息分发到适当的队列。客户端将消息发送到交换机,而交换机会根据绑定的路由键(routing key)将消息路由到对应的队列。 RabbitMQ支持几种类型的交换机:- Direct Exchange:根据路由键一对一将消息发送到队列。
- Fanout Exchange:将消息广播到所有绑定到它的队列。
- Topic Exchange:根据路由键模式(类似正则表达式)将消息路由到一个或多个队列。
- Headers Exchange:通过消息头部的属性来决定消息的路由。
- 发布消息(Publishing)
- 生产者(
Producer):生产者将消息发布到交换机。生产者通常不会直接与队列通信,而是将消息发送到交换机,交换机决定消息的路由。 - 消息可以携带属性,如
routing key(路由键),用于帮助交换机决定消息的路由方向。 - 如果消息发布时队列未创建,生产者可以要求
RabbitMQ自动创建队列,或自己事先声明队列。
- 消息消费(Consuming)
- 消费者(
Consumer):消费者订阅队列,从队列中获取消息并进行处理。一个队列可以有多个消费者,RabbitMQ会轮流将消息分发给所有绑定的消费者。 - 当消息被消费者成功消费后,可以根据消息的 确认机制 来决定是否从队列中删除消息。
- 消费者可以使用两种主要的确认方式:
- 自动确认:消费者在获取消息后会自动确认消息已处理,
RabbitMQ会将该消息从队列中移除。 - 手动确认:消费者处理完消息后需要明确调用确认方法,确保消息在处理完成后才会被移除。
- 自动确认:消费者在获取消息后会自动确认消息已处理,
- 消息确认(Acknowledgment)
- 为了保证消息的可靠性,
RabbitMQ提供了消息确认机制。当消费者从队列中获取到消息后,可以选择确认消息,表示消息已成功处理。 - 如果消费者在处理消息时出现异常,消息不会被确认,
RabbitMQ会将消息重新放入队列供其他消费者重新消费(也可以设置消息的重试次数和延迟时间)。
- 消息持久化(Persistence)
- 队列持久化:在声明队列时,可以将其设置为持久化,确保即使
RabbitMQ服务器重启,队列本身依然存在。 - 消息持久化:可以通过将消息标记为持久化(
delivery_mode=2)来确保消息在RabbitMQ重启后不丢失。
- 消息路由(Routing)
- 交换机根据绑定到它的队列和路由键,决定如何将消息路由到一个或多个队列。
- 如果消息没有找到匹配的队列,通常情况下,消息会被丢弃,除非启用了死信队列
DLQ(Dead Letter Queue)来接收无法路由的消息。
简单通信流程

- 生产者 将消息发送到指定的 交换机,消息携带一个路由键。
- 交换机 根据路由规则将消息发送到一个或多个队列。
- 消费者 监听并消费从队列中获取到的消息,进行处理。
- 消费者处理完消息后,通过确认机制告知
RabbitMQ消息已处理。 RabbitMQ根据确认机制和消息持久化设置,确保消息不会丢失。
Ⅲ. AMQP-CPP 库的简单使用
一、介绍
AMQP-CPP 是一个用于实现 AMQP (Advanced Message Queuing Protocol) 协议的 C++ 库。AMQP 是一种广泛应用于消息中间件的开放标准协议,支持在分布式系统中高效、安全地交换消息。AMQP-CPP 库使得 C++ 开发者能够轻松实现 AMQP 协议的客户端和服务器,通常用于与 RabbitMQ 等消息中间件进行交互。
AMQP-CPP 完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中。
注意事项:这个库需要 c++17 的支持。
主要功能有:
- 连接和通道管理:
AMQP-CPP支持通过AMQP协议连接到RabbitMQ服务器,并能够创建多个通道进行并发消息处理。 - 消息发布与订阅:可以方便地进行消息发布(生产者)和订阅(消费者),支持不同类型的交换机(
Direct、Topic、Fanout等)和队列。 - 队列管理:支持创建、删除、绑定和监听队列。
- 异常处理和重试机制:内置异常处理,能够在连接断开后自动进行重试,确保消息通信的稳定性。
二、使用
通常 AMQP-CPP 的使用有两种模式:
- 使用默认的
TCP模块进行网络通信(相当于自己手动实现,比较麻烦!)- 实现一个类继承自
AMQP::TcpHandler类, 它负责网络层的TCP连接 - 重写相关函数, 其中必须重写
monitor函数 - 在
monitor函数中需要实现的是将fd放入eventloop(select、epoll)中监控, 当fd可写可读就绪之后, 调用AMQP-CPP的connection->process(fd, flags)方法
- 实现一个类继承自
- 使用扩展的
libevent、libev、libuv、asio异步通信组件进行通信(相当于有现成的组件供我们直接使用,我们优先用这种模式!)- 以
libev为例, 我们不必要自己实现monitor函数, 可以直接使用AMQP::LibEvHandler
- 以
在本项目编译链接的时候,记得链接动态库:-lamqpcpp -lev。
三、常用类与接口介绍
Channel 类
channel 是一个虚拟连接,一个连接上可以建立多个通道。并且所有的 RabbitMq 指令都是通过 channel 传输,所以连接建立后的第一步,就是建立 channel。
因为所有操作是异步的,所以在 channel 上执行指令的返回值并不能作为操作执行结果,实际上它返回的是 Deferred 类,可以使用它安装处理函数。
1namespace AMQP { 2 /** 3 * 通用回调函数类型,用于许多延迟处理对象 4 */ 5 using SuccessCallback = std::function<void()>; // 成功回调函数类型,无参数 6 using ErrorCallback = std::function<void(const char *message)>; // 错误回调函数类型,带错误信息 7 using FinalizeCallback = std::function<void()>; // 完成回调函数类型,无参数 8 9 // 队列回调,返回队列名称、消息数和消费者数 10 using QueueCallback = std::function<void(const std::string &name, 11 uint32_t messagecount, 12 uint32_t consumercount)>; 13 // 删除队列回调,返回删除的消息数 14 using DeleteCallback = std::function<void(uint32_t deletedmessages)>; 15 16 // 消息接收回调,返回消息对象、交付标签和是否重新投递的标志 17 using MessageCallback = std::function<void(const Message &message, 18 uint64_t deliveryTag, 19 bool redelivered)>; 20 /** 21 * 发布者确认回调,返回消息交付标签和是否是多个消息 22 * 当服务器确认消息已被接收和处理时,触发此回调 23 */ 24 using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>; 25 26 /** 27 * 使用确认包裹通道时,当消息被 ack/nacked 时,调用这些回调 28 */ 29 using PublishAckCallback = std::function<void()>; // 发布确认回调 30 using PublishNackCallback = std::function<void()>; // 发布未确认回调 31 using PublishLostCallback = std::function<void()>; // 发布消息丢失回调 32 33 /** 34 * Channel 类定义 35 * 管理与 RabbitMQ 服务器之间的消息通道 36 */ 37 class Channel { 38 public: 39 // 构造函数,创建一个通道,并与指定连接绑定 40 Channel(Connection *connection); 41 42 // 检查通道是否已经连接 43 bool connected(); 44 45 /** 46 * 声明交换机 47 * 如果交换机名称为空,服务器会自动分配一个名称 48 * 支持的交换机类型:fanout(广播)、direct(直接)、topic(主题)、headers、consistent_hash、message_deduplication 49 * 以下 flags 可用于交换机: 50 * -durable 持久化,重启后交换机依然有效 51 * -autodelete 删除所有连接的队列后,自动删除交换 52 * -passive 仅被动检查交换机是否存在 53 * -internal 创建内部交换 54 * 此函数返回一个延迟处理程序,可以安装回调 using onSuccess(), onError() 和 onFinalize() 55 * 这里我们只用到 onError(),如下所示: 56 * Deferred &onError(const char *message) 57 */ 58 Deferred &declareExchange( 59 const std::string_view &name, // 交换机名称 60 ExchangeType type, // 交换机类型 61 int flags, // 交换机标志 62 const Table &arguments // 其他参数 63 ); 64 65 /** 66 * 声明队列 67 * 如果队列名称为空,服务器会自动分配一个名称 68 * flags 可以是以下值的组合: 69 * -durable 持久队列在代理重新启动后仍然有效 70 * -autodelete 当所有连接的使用者都离开时,自动删除队列 71 * -passive 仅被动检查队列是否存在 72 * -exclusive 队列仅存在于此连接,并且在连接断开时自动删除 73 * 74 * 可以安装的 onSuccess()回调应该具有以下签名: 75 void myCallback(const std::string &name, 76 uint32_t messageCount, 77 uint32_t consumerCount); 78 例如: 79 channel.declareQueue("myqueue").onSuccess( 80 [](const std::string &name, 81 uint32_t messageCount, 82 uint32_t consumerCount) { 83 std::cout << "Queue '" << name << "' "; 84 std::cout << "has been declared with "; 85 std::cout << messageCount; 86 std::cout << " messages and "; 87 std::cout << consumerCount; 88 std::cout << " consumers" << std::endl; 89 }); 90 */ 91 DeferredQueue &declareQueue( 92 const std::string_view &name, // 队列名称 93 int flags, // 队列标志 94 const Table &arguments // 其他参数 95 ); 96 97 /** 98 * 将队列绑定到交换机 99 * 参数:交换机名称、队列名称、路由密钥以及其他绑定参数 100 */ 101 Deferred &bindQueue( 102 const std::string_view &exchange, // 交换机名称 103 const std::string_view &queue, // 队列名称 104 const std::string_view &routingkey, // 路由密钥 105 const Table &arguments // 其他绑定参数 106 ); 107 108 /** 109 * 将消息发布到交换机 110 * 必须提供交换机的名称和路由密钥,然后 RabbitMQ 会将消息发送到符合条件的队列 111 * 可选的 flags 参数可以指定如果消息无法路由到队列时应该发生的情况。默认情况下,不可更改的消息将被静默地丢弃。 112 * 如果设置了 mandatory 或 immediate 标志,则无法处理的消息将返回到应用程序。 113 * 在开始发布之前,请确保已经调用了 recall() 方法,并设置了所有适当的处理程序来处理这些返回的消息。 114 * 可以提供以下 flags: 115 * -mandatory 如果设置,服务器将返回未发送到队列的消息 116 * -immediate 如果设置,服务器将返回无法立即转发给使用者的消息。 117 */ 118 bool publish( 119 const std::string_view &exchange, // 交换机名称 120 const std::string_view &routingKey, // 路由密钥 121 const std::string &message, // 消息内容 122 int flags = 0 // 可选标志 123 ); 124 125 /** 126 * 告诉 RabbitMQ 服务器我们已经准备好开始消费队列消息 127 * 如果未指定消费者标签,服务器会自动分配一个 128 * 支持以下 flags: 129 * -nolocal 如果设置了,则不会同时消耗在此通道上发布的消息 130 * -noack 如果设置了,则不必对已消费的消息进行确认 131 * -exclusive 请求独占访问,只有此使用者可以访问队列 132 133 * 可以安装的 onSuccess()回调应该具有以下格式: 134 void myCallback(const std::string_view&tag); 135 样例: 136 channel.consume("myqueue").onSuccess([](const std::string_view& tag) { 137 std::cout << "Started consuming under tag "; 138 std::cout << tag << std::endl; 139 }); 140 */ 141 DeferredConsumer &consume( 142 const std::string_view &queue, // 队列名称 143 const std::string_view &tag, // 消费者标签 144 int flags, // 消费者标志 145 const Table &arguments // 其他参数 146 ); 147 148 /** 149 * 确认接收到的消息 150 * 消费者需要确认收到消息,告知 RabbitMQ 将消息从队列中移除 151 * 支持多条确认:之前所有未确认的消息将一并确认 152 * 当在 DeferredConsumer::onReceived() 方法中接收到消息时,必须确认该消息,以便 RabbitMQ 将其从队列中删除(除非使用 noack 选项消费)。 153 */ 154 bool ack(uint64_t deliveryTag, int flags = 0); // 确认消息,返回是否成功 155 }; 156 157 /** 158 * DeferredConsumer 类定义 159 * 管理延迟消费者,支持注册回调函数处理不同的事件 160 */ 161 class DeferredConsumer { 162 public: 163 // 注册消费者启动时的回调函数 164 DeferredConsumer &onSuccess(const ConsumeCallback& callback); 165 166 // 注册接收到消息时的回调函数 167 DeferredConsumer &onReceived(const MessageCallback& callback); 168 169 // 注册接收到消息的别名 170 DeferredConsumer &onMessage(const MessageCallback& callback); 171 172 // 注册服务器取消消费者时的回调函数 173 DeferredConsumer &onCancelled(const CancelCallback& callback); 174 }; 175 176 /** 177 * 消息类,代表通过 RabbitMQ 发送的消息 178 */ 179 class Message : public Envelope { 180 public: 181 const std::string &exchange(); // 获取消息的交换机 182 const std::string &routingkey(); // 获取消息的路由键 183 }; 184 185 /** 186 * 信封类,代表包含元数据和消息体的结构 187 */ 188 class Envelope : public MetaData { 189 public: 190 const char *body(); // 获取消息体内容(没有以'\0'结尾,所以获取时候需要结合下面的bodySize()接口) 191 uint64_t bodySize(); // 获取消息体的大小 192 }; 193} 194
libev
libev 是一个高效的事件循环库,广泛用于事件驱动的编程模型中。它提供了一个轻量级、低延迟的事件循环实现,能够高效地处理 I/O 操作、定时器和其他异步事件。比如:
ev_loop:事件循环对象,管理和调度事件ev_async:用于处理异步事件的结构体,通常用于线程间的异步通知ev_run:启动事件循环ev_break:退出事件循环
1// 定义 ev_async 结构体,表示异步事件 2typedef struct ev_async { 3 EV_WATCHER(ev_async) // 宏,标记此结构体为事件观察者(watcher) 4 EV_ATOMIC_T sent; // 标记是否发送过异步事件,通常是一个原子操作标志 5} ev_async; 6 7// 事件循环的中断类型枚举 8enum { 9 EVBREAK_CANCEL = 0, // 取消循环,撤销unloop操作 10 EVBREAK_ONE = 1, // 退出一次循环,只停止当前事件循环 11 EVBREAK_ALL = 2 // 退出所有事件循环,完全停止 12}; 13 14// 默认事件循环初始化函数 15// flags 参数可以指定额外的标志,默认值为 0 16// EV_CPP 是用于 C++ 环境的一个宏,通常设为0表示默认行为 17struct ev_loop *ev_default_loop(unsigned int flags EV_CPP (= 0)); 18 19// 宏定义,表示初始化默认的事件循环 20# define EV_DEFAULT ev_default_loop(0) 21 22// 启动事件循环,loop 是事件循环对象 23// 事件循环将监听并处理所有注册的事件和回调 24int ev_run(struct ev_loop *loop); 25 26// 退出事件循环 27// break_type 表示退出的类型,控制如何退出循环 28// -EVBREAK_CANCEL: 撤销之前的 unloop 操作 29// -EVBREAK_ONE: 停止一次循环 30// -EVBREAK_ALL: 停止所有循环,彻底退出 31void ev_break(struct ev_loop *loop, int32_t break_type); 32 33// 异步事件回调函数的类型定义 34// 该回调函数会在事件触发时被调用 35void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents); 36 37// 初始化一个 ev_async 结构体,并设置其回调函数 38// 当事件触发时,ev_async 的回调函数会被调用 39void ev_async_init(ev_async *w, callback cb); 40 41// 将异步事件 watcher 添加到事件循环 loop 中开始监听并等待触发 42// 该事件会被添加到事件循环中, 43void ev_async_start(struct ev_loop *loop, ev_async *w); 44 45// 发送一个异步事件到事件循环中,通常用来唤醒事件循环中的等待状态 46// 一旦事件被发送,事件循环将处理该异步事件 47void ev_async_send(struct ev_loop *loop, ev_async *w); 48
使用样例
publish.cc 文件:
1#include <ev.h> 2#include <amqpcpp.h> 3#include <amqpcpp/libev.h> 4#include <openssl/ssl.h> 5#include <openssl/opensslv.h> 6 7int main() 8{ 9 // 1. 实例化底层网络通信框架的I/O事件监控句柄 10 auto *loop = EV_DEFAULT; 11 12 // 2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来 13 AMQP::LibEvHandler handler(loop); 14 15 // 2.5 实例化连接对象 16 AMQP::Address address("amqp://root:123456@127.0.0.1:5672/"); 17 AMQP::TcpConnection connection(&handler, address); 18 19 // 3. 实例化信道对象 20 AMQP::TcpChannel channel(&connection); 21 22 // 4. 声明交换机 23 channel.declareExchange("test-exchange", AMQP::ExchangeType::direct) // 一对一消息传递模式 24 .onError([](const char *message) { 25 std::cout << "声明交换机失败:" << message << std::endl; 26 exit(0); 27 }) 28 .onSuccess([](){ 29 std::cout << "test-exchange 交换机创建成功!" << std::endl; 30 }); 31 32 // 5. 声明队列 33 channel.declareQueue("test-queue") 34 .onError([](const char *message) { 35 std::cout << "声明队列失败:" << message << std::endl; 36 exit(0); 37 }) 38 .onSuccess([](){ 39 std::cout << "test-queue 队列创建成功!" << std::endl; 40 }); 41 42 // 6. 针对交换机和队列进行绑定 43 channel.bindQueue("test-exchange", "test-queue", "test-queue-key") 44 .onError([](const char *message) { 45 std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl; 46 exit(0); 47 }) 48 .onSuccess([](){ 49 std::cout << "test-exchange - test-queue 绑定成功!" << std::endl; 50 }); 51 52 // 7. 向交换机发布消息 53 for (int i = 0; i < 10; i++) { 54 std::string msg = "Hello lirendada-" + std::to_string(i); 55 bool ret = channel.publish("test-exchange", "test-queue-key", msg); 56 if (ret == false) { 57 std::cout << "publish 失败!\n"; 58 } 59 } 60 61 // 启动底层网络通信框架--开启I/O 62 ev_run(loop, 0); 63 return 0; 64} 65
consume.cc 文件:
1#include <ev.h> 2#include <amqpcpp.h> 3#include <amqpcpp/libev.h> 4#include <openssl/ssl.h> 5#include <openssl/opensslv.h> 6 7//消息回调处理函数的实现 8void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) 9{ 10 std::string msg; 11 msg.assign(message.body(), message.bodySize()); 12 std::cout << msg << std::endl; 13 channel->ack(deliveryTag); // 对消息进行确认 14} 15 16int main() 17{ 18 // 1. 实例化底层网络通信框架的I/O事件监控句柄 19 auto *loop = EV_DEFAULT; 20 21 // 2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来 22 AMQP::LibEvHandler handler(loop); 23 24 // 2.5 实例化连接对象 25 AMQP::Address address("amqp://root:123456@127.0.0.1:5672/"); 26 AMQP::TcpConnection connection(&handler, address); 27 28 // 3. 实例化信道对象 29 AMQP::TcpChannel channel(&connection); 30 31 // 4. 声明交换机 32 channel.declareExchange("test-exchange", AMQP::ExchangeType::direct) 33 .onError([](const char *message) { 34 std::cout << "声明交换机失败:" << message << std::endl; 35 exit(0); 36 }) 37 .onSuccess([](){ 38 std::cout << "test-exchange 交换机创建成功!" << std::endl; 39 }); 40 41 // 5. 声明队列 42 channel.declareQueue("test-queue") 43 .onError([](const char *message) { 44 std::cout << "声明队列失败:" << message << std::endl; 45 exit(0); 46 }) 47 .onSuccess([](){ 48 std::cout << "test-queue 队列创建成功!" << std::endl; 49 }); 50 51 // 6. 针对交换机和队列进行绑定 52 channel.bindQueue("test-exchange", "test-queue", "test-queue-key") 53 .onError([](const char *message) { 54 std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl; 55 exit(0); 56 }) 57 .onSuccess([](){ 58 std::cout << "test-exchange - test-queue 绑定成功!" << std::endl; 59 }); 60 61 // 7. 订阅队列消息 -- 设置消息处理回调函数 62 auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); 63 channel.consume("test-queue", "consume-tag") //返回值 DeferredConsumer 64 .onReceived(callback) 65 .onError([](const char *message){ 66 std::cout << "订阅 test-queue 队列消息失败:" << message << std::endl; 67 exit(0); 68 }); // 返回值是 AMQP::Deferred 69 70 // 8. 启动底层网络通信框架--开启I/O 71 ev_run(loop, 0); 72 return 0; 73} 74
makefile 文件:
1all : publish consume 2publish : publish.cc 3 g++ -std=c++17 $^ -o $@ -lamqpcpp -lev 4consume : consume.cc 5 g++ -std=c++17 $^ -o $@ -lamqpcpp -lev 6
Ⅳ. 二次封装
在项目中使用 rabbitmq 的时候,我们目前只需要交换机与队列的直接交换,实现一台主机将消息发布给另一台主机进行处理的功能,因此在这里可以对 rabbitmq 的操作进行简单的封装,使 rabbitmq 的操作在项目中更加简便:
封装一个 MQClient:
- 提供声明和绑定指定交换机与队列的功能
- 提供向指定交换机发布消息的功能
- 提供订阅指定队列消息,并设置回调函数进行消息消费处理的功能
1// rabbitmq.hpp 2#pragma once 3#include <ev.h> 4#include <amqpcpp.h> 5#include <amqpcpp/libev.h> 6#include <openssl/ssl.h> 7#include <openssl/opensslv.h> 8#include <iostream> 9#include <functional> 10#include "logger.hpp" 11 12class MQClient 13{ 14public: 15 using ptr = std::shared_ptr<MQClient>; 16 using MessageCallback = std::function<void(const char*, size_t)>; // 定义消息回调类型 17 18 // 构造函数,初始化连接信息,连接到AMQP服务器 19 MQClient(const std::string &user, 20 const std::string passwd, 21 const std::string host) 22 { 23 // 初始化Libev事件循环 24 _loop = EV_DEFAULT; 25 // 创建AMQP的LibEv事件循环处理器 26 _handler = std::make_unique<AMQP::LibEvHandler>(_loop); 27 28 // 构建AMQP连接地址(例如:amqp://root:123456@127.0.0.1:5672/) 29 std::string url = "amqp://" + user + ":" + passwd + "@" + host + "/"; 30 AMQP::Address address(url); 31 32 // 创建TCP连接 33 _connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address); 34 // 创建TCP通道 35 _channel = std::make_unique<AMQP::TcpChannel>(_connection.get()); 36 37 // 启动一个新线程,运行事件循环 38 _loop_thread = std::thread([this]() { 39 ev_run(_loop, 0); 40 }); 41 } 42 43 // 析构函数,关闭事件循环并清理资源 44 ~MQClient() 45 { 46 // 初始化异步事件 47 ev_async_init(&_async_watcher, watcher_callback); 48 ev_async_start(_loop, &_async_watcher); 49 ev_async_send(_loop, &_async_watcher); 50 51 // 等待事件循环线程结束 52 _loop_thread.join(); 53 _loop = nullptr; 54 } 55 56 // 声明交换机、队列及绑定 57 void declareComponents(const std::string &exchange, // 交换机名称 58 const std::string &queue, // 队列名称 59 const std::string &routing_key = "routing_key", 60 AMQP::ExchangeType echange_type = AMQP::ExchangeType::direct) // 一对一模式 61 { 62 // 声明交换机 63 _channel->declareExchange(exchange, echange_type) 64 .onError([](const char *message) { 65 LOG_ERROR("声明交换机失败:{}", message); 66 exit(0); 67 }) 68 .onSuccess([exchange](){ 69 LOG_ERROR("{} 交换机创建成功!", exchange); 70 }); 71 72 // 声明队列 73 _channel->declareQueue(queue) 74 .onError([](const char *message) { 75 LOG_ERROR("声明队列失败:{}", message); 76 exit(0); 77 }) 78 .onSuccess([queue](){ 79 LOG_ERROR("{} 队列创建成功!", queue); 80 }); 81 82 // 绑定交换机与队列 83 _channel->bindQueue(exchange, queue, routing_key) 84 .onError([exchange, queue](const char *message) { 85 LOG_ERROR("{} - {} 绑定失败:", exchange, queue); 86 exit(0); 87 }) 88 .onSuccess([exchange, queue, routing_key](){ 89 LOG_ERROR("{} - {} - {} 绑定成功!", exchange, queue, routing_key); 90 }); 91 } 92 93 // 发布消息到指定交换机 94 bool publish(const std::string &exchange, 95 const std::string &msg, 96 const std::string &routing_key = "routing_key") 97 { 98 LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key); 99 bool ret = _channel->publish(exchange, routing_key, msg); 100 if (ret == false) { 101 LOG_ERROR("{} 发布消息失败:", exchange); 102 return false; 103 } 104 return true; 105 } 106 107 // 从指定队列消费消息 108 void consume(const std::string &queue, const MessageCallback &cb) 109 { 110 LOG_DEBUG("开始订阅 {} 队列消息!", queue); 111 _channel->consume(queue, "consume-tag") // 返回值为DeferredConsumer,而不是Deferred,所以要先用onReceived再用onError 112 .onReceived([this, cb](const AMQP::Message &message, 113 uint64_t deliveryTag, 114 bool redelivered) 115 { 116 cb(message.body(), message.bodySize()); // 消息处理回调 117 _channel->ack(deliveryTag); // 确认消息处理 118 }) 119 .onError([queue](const char *message){ 120 LOG_ERROR("订阅 {} 队列消息失败: {}", queue, message); 121 exit(0); 122 }); 123 } 124 125private: 126 // Libev事件循环异步回调 127 static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents) { 128 ev_break(loop, EVBREAK_ALL); // 停止事件循环 129 } 130 131private: 132 struct ev_async _async_watcher; // 异步事件监视器 133 struct ev_loop *_loop; // Libev事件循环 134 std::unique_ptr<AMQP::LibEvHandler> _handler; // Libev事件处理器 135 std::unique_ptr<AMQP::TcpConnection> _connection; // TCP连接 136 std::unique_ptr<AMQP::TcpChannel> _channel; // AMQP通道 137 std::thread _loop_thread; // 事件循环线程 138}; 139
测试代码
publish.cc 文件:
1#include "../../../header/rabbitmq.hpp" 2#include "../../../header/logger.hpp" 3#include <gflags/gflags.h> 4 5DEFINE_string(user, "root", "rabbitmq访问用户名"); 6DEFINE_string(pswd, "123456", "rabbitmq访问密码"); 7DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port"); 8 9DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;"); 10DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件"); 11DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级"); 12 13int main(int argc, char *argv[]) 14{ 15 google::ParseCommandLineFlags(&argc, &argv, true); 16 init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level); 17 18 // 创建客户端对象 19 MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host); 20 21 // 声明交换机、队列及绑定 22 client.declareComponents("test-exchange", "test-queue"); 23 24 // 发布消息 25 for (int i = 0; i < 10; i++) 26 { 27 std::string msg = "Hello lirendada-" + std::to_string(i); 28 bool ret = client.publish("test-exchange", msg); 29 if (ret == false) { 30 LOG_WARN("publish 失败!\n"); 31 } 32 } 33 std::this_thread::sleep_for(std::chrono::seconds(3)); 34 return 0; 35} 36
consume.cc 文件:
1#include "../../../header/rabbitmq.hpp" 2#include "../../../header/logger.hpp" 3#include <gflags/gflags.h> 4 5DEFINE_string(user, "root", "rabbitmq访问用户名"); 6DEFINE_string(pswd, "123456", "rabbitmq访问密码"); 7DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port"); 8 9DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;"); 10DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件"); 11DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级"); 12 13void callback(const char *body, size_t sz) 14{ 15 std::string msg; 16 msg.assign(body, sz); 17 LOG_DEBUG("{}", msg); 18} 19 20int main(int argc, char *argv[]) 21{ 22 google::ParseCommandLineFlags(&argc, &argv, true); 23 init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level); 24 25 // 创建客户端对象 26 MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host); 27 28 // 声明交换机、队列及绑定 29 client.declareComponents("test-exchange", "test-queue"); 30 31 // 订阅消息 32 client.consume("test-queue", callback); 33 std::this_thread::sleep_for(std::chrono::seconds(60)); 34 return 0; 35} 36 37
makefile 文件:
1all : publish consume 2publish : publish.cc 3 g++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags 4consume : consume.cc 5 g++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags 6
