消息队列
难度等级:⭐⭐⭐ 前置知识:Web 后端开发 后续衔接:微服务架构、事件驱动架构
学习路径
- 入门阶段:理解消息队列的作用和基本概念
- 进阶阶段:掌握一种 MQ 的部署、使用、可靠性保证
- 精通阶段:能够设计高可用的消息中间件架构
一、消息队列基础
1.1 为什么需要消息队列
消息队列(Message Queue,简称 MQ)是分布式系统中重要的基础设施,主要解决以下问题:
异步处理:在同步调用场景中,用户注册需要写入数据库、发送验证邮件、初始化用户画像,整个过程可能需要 3 秒。引入消息队列后,主流程只写入数据库(200ms),其余操作通过消息异步完成,响应时间大幅降低。
削峰填谷:电商秒杀场景下,瞬时流量可能达到每秒数万请求,直接打到数据库会导致服务崩溃。消息队列作为缓冲区,将请求暂存后以数据库能承受的速度消费,保护下游系统。
系统解耦:订单系统完成支付后,需要通知库存系统扣减、物流系统发货、积分系统增加积分。如果采用同步调用,订单系统需要知道所有下游系统的接口,耦合严重。通过消息队列,订单系统只需发送一条消息,下游系统各自订阅,新增系统无需修改订单代码。
最终一致性:在分布式系统中,跨服务的数据一致性很难通过事务保证。消息队列结合本地事务表或事务消息,可以实现跨服务的最终一致性。
1.2 核心概念
Producer(生产者):发送消息的应用。生产者将消息发送到消息中间件,不关心谁消费、何时消费。
Consumer(消费者):接收并处理消息的应用。消费者从消息中间件拉取或订阅消息进行处理。
Broker(代理):消息队列的服务端,负责接收、存储、转发消息。一个 MQ 集群通常由多个 Broker 组成。
Topic(主题):消息的逻辑分类。生产者将消息发送到指定 Topic,消费者订阅感兴趣的 Topic。
Queue(队列):消息的物理存储单元。一个 Topic 可以包含多个 Queue,用于水平扩展。
Message(消息):传输的数据单元,通常包含 Header(元数据)、Body(消息体)、Properties(自定义属性)。
Offset(偏移量):消费者在队列中的消费位置标记,用于记录已消费到哪条消息。
1.3 消息模型
点对点模型(Point-to-Point):
- 每条消息只能被一个消费者消费
- 消费者消费后消息从队列中删除
- 适用于任务分发场景,如订单处理、工单分配
- 多个消费者竞争消费同一队列,实现负载均衡
生产者 → [Queue] → 消费者A(消费消息1)
→ 消费者B(消费消息2)
发布订阅模型(Publish-Subscribe):
- 每条消息可以被多个消费者组消费
- 每个消费者组独立维护消费进度
- 适用于事件通知场景,如订单完成通知库存、物流、积分系统
- 消费者组内保持点对点语义,组间保持广播语义
生产者 → [Topic] → 消费者组A → 消费者A1
→ 消费者组B → 消费者B1
→ 消费者B2
两种模型各有适用场景,现代 MQ 产品通常同时支持两种模型。
二、RabbitMQ
2.1 架构与协议
RabbitMQ 基于 AMQP(Advanced Message Queuing Protocol)协议实现,是开源消息中间件中协议支持最完善的产品。
AMQP 协议核心概念:
- Virtual Host:虚拟主机,用于逻辑隔离,类似数据库的 schema
- Exchange:交换机,接收生产者消息并根据路由规则转发到队列
- Binding:绑定关系,定义 Exchange 和 Queue 之间的路由规则
- Routing Key:路由键,生产者发送消息时指定,用于匹配 Binding
Exchange 类型:
| 类型 | 路由规则 | 典型场景 |
|---|---|---|
| Direct | 精确匹配 Routing Key | 单播、指定消费者 |
| Topic | 通配符匹配(* 匹配一个词,# 匹配零或多个词) | 多播、按规则路由 |
| Fanout | 广播到所有绑定队列 | 事件广播、日志分发 |
| Headers | 根据消息 Header 属性匹配 | 复杂条件路由 |
// Direct Exchange 示例
channel.exchangeDeclare("order.exchange", BuiltinExchangeType.DIRECT);
channel.queueDeclare("order.queue", true, false, false, null);
channel.queueBind("order.queue", "order.exchange", "order.created");
// 发送消息
channel.basicPublish("order.exchange", "order.created",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
2.2 消息确认机制
RabbitMQ 提供多层确认机制,确保消息不丢失:
Publisher Confirm(生产者确认):
- 生产者发送消息后,Broker 返回确认应答
- 开启方式:
channel.confirmSelect() - 同步确认:每条消息等待确认后再发下一条
- 异步确认:批量发送,通过回调处理确认结果
// 异步 Confirm 示例
channel.confirmSelect();
channel.addConfirmListener(
(deliveryTag, multiple) -> {
// 消息成功投递到 Broker
log.info("Message {} confirmed", deliveryTag);
},
(deliveryTag, multiple) -> {
// 消息投递失败,需要重试
log.error("Message {} nacked", deliveryTag);
}
);
Consumer Ack(消费者确认):
- 自动确认:消息投递给消费者后立即确认(可能丢失)
- 手动确认:消费者处理完成后显式调用
basicAck - 拒绝消息:
basicNack可设置是否重新入队
// 手动 Ack 示例
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
processMessage(delivery.getBody());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// requeue=false 时消息进入死信队列
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
}
};
channel.basicConsume("order.queue", false, deliverCallback, consumerTag -> {});
2.3 死信队列与延迟队列
死信队列(DLX - Dead Letter Exchange): 消息在以下情况成为死信:
- 消费者拒绝且 requeue=false
- 消息 TTL 过期
- 队列达到最大长度
配置死信队列:
// 主队列配置死信参数
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.routing");
channel.queueDeclare("main.queue", true, false, false, args);
// 死信队列
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routing");
延迟队列: RabbitMQ 原生不支持延迟队列,通过以下方式实现:
- TTL + DLX:设置消息 TTL,过期后转入死信队列,消费者从死信队列消费
- 延迟队列插件:安装
rabbitmq-delayed-message-exchange插件
// 插件方式实现延迟队列
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed.exchange", "x-delayed-message", true, false, args);
// 发送延迟消息
Map<String, Object> headers = new HashMap<>();
headers.put("x-delay", 5000); // 延迟 5 秒
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(headers).build();
channel.basicPublish("delayed.exchange", "delayed.routing", props, message.getBytes());
2.4 集群与高可用
普通集群模式:
- 多个 Broker 节点组成集群
- 队列元数据在所有节点同步,但消息只存储在一个节点
- 访问非消息所在节点时需要内部转发,性能较差
镜像队列(Mirrored Queue):
- 队列消息在所有节点或指定节点镜像
- 主节点处理读写,从节点同步数据
- 主节点故障时自动选举新从节点为主节点
- 缺点:写性能下降,所有节点需存储完整消息
Quorum Queue(仲裁队列):
- RabbitMQ 3.8+ 引入,基于 Raft 协议实现
- 替代镜像队列,提供更好的数据安全性
- 至少 3 个节点,多数派写入成功即确认
- 支持在线扩容,推荐生产环境使用
# 声明仲裁队列
rabbitmqadmin declare queue name=quorum.queue durable=true arguments='{"x-queue-type":"quorum"}'
三、Kafka
3.1 核心架构
Kafka 是 LinkedIn 开源的分布式流平台,设计目标是高吞吐、可扩展、持久化。
核心组件:
- Topic:消息的逻辑分类,每个 Topic 包含多个 Partition
- Partition:分区,Topic 的并行单元,消息在 Partition 内有序
- Replica:副本,每个 Partition 有多个副本分布在不同 Broker
- ISR(In-Sync Replica):与 Leader 保持同步的副本集合
- Controller:集群控制器,负责分区选举、Broker 管理,从 0.11 版本后使用 ZK 选举
- ZooKeeper:早期版本用于元数据存储和 Controller 选举,KRaft 模式(2.8+)已可替代 ZK
副本机制:
- 每个 Partition 有一个 Leader 和多个 Follower
- 生产者和消费者只与 Leader 交互
- Follower 从 Leader 拉取数据保持同步
- Leader 故障时从 ISR 中选举新 Leader
# 创建 Topic,3 个分区,每个分区 2 个副本
kafka-topics.sh --create --topic order-topic \
--bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 2
3.2 生产者
分区策略:
- 指定分区:直接指定 partition 字段
- 指定 Key:对 Key 取哈希后映射到分区
- 未指定:轮询(Round-Robin)分配
acks 配置:
acks=0:不等待确认,最高吞吐,可能丢消息acks=1:Leader 写入成功即确认,Leader 故障可能丢消息acks=all(或-1):ISR 全部写入成功,最安全
幂等性:
- 开启
enable.idempotence=true - 每个 Producer 有唯一 PID,每条消息有递增序列号
- Broker 检测重复消息并去重
- 保证单分区内精确一次语义
事务:
- 跨分区的事务支持
- 设置
transactional.id - 使用
initTransactions()、beginTransaction()、commitTransaction()
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("order-topic", "key", "message"));
producer.close();
3.3 消费者
Consumer Group(消费者组):
- 同一组内消费者共同消费一个 Topic
- 每个 Partition 只能被组内一个消费者消费
- 消费者数超过分区数时,多余消费者空闲
Rebalance(重平衡):
- 消费者加入/退出、Topic 扩容时触发
- 所有消费者停止消费,重新分配分区
- 期间消费暂停,影响可用性
- 可通过
partition.assignment.strategy优化
Offset 管理:
- 旧版本存在 ZK,新版本存储在
__consumer_offsetsTopic - 自动提交:
enable.auto.commit=true,定期提交 - 手动提交:处理完成后调用
commitSync()或commitAsync()
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("order-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
consumer.commitSync(); // 手动提交 offset
}
3.4 存储机制
Segment 文件:
- 每个 Partition 对应一个目录
- 数据分为
.log(消息数据)和.index(偏移量索引) - 按段(Segment)切分,达到大小或时间阈值后滚动
- 文件名是起始 offset,便于快速定位
日志压缩(Log Compaction):
- 保留每个 Key 的最新值
- 后台线程定期清理旧版本
- 适用于配置更新、状态存储场景
零拷贝(Zero Copy):
- 使用
sendfile系统调用 - 数据从 Page Cache 直接发送到 Socket,不经过用户空间
- 大幅降低 CPU 和内存开销
磁盘 → Page Cache → Socket Buffer → 网卡
(跳过用户空间拷贝)
数据过期策略:
log.retention.hours:默认 7 天log.retention.bytes:按大小限制log.cleanup.policy:delete(删除)或 compact(压缩)
3.5 Kafka Streams
Kafka Streams 是 Kafka 提供的轻量级流处理库,无需独立集群。
核心概念:
- KStream:无限数据流,每条记录独立处理
- KTable:变更日志流,相同 Key 的最新值
- GlobalKTable:全局表,所有分区数据完整副本
窗口操作:
- 滚动窗口(Tumbling Window):固定大小,不重叠
- 滑动窗口(Hopping Window):固定大小,可重叠
- 会话窗口(Session Window):按活动间隙划分
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
// 过滤 + 转换
source.filter((key, value) -> value != null)
.mapValues(value -> value.toUpperCase())
.to("output-topic");
// 窗口聚合
source.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count()
.toStream()
.to("window-count-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
四、RocketMQ
4.1 架构设计
RocketMQ 是阿里巴巴开源的分布式消息中间件,后捐赠给 Apache。
核心组件:
- NameServer:路由注册中心,无状态,Broker 定期注册,生产者消费者从中获取路由信息。类似 ZK 但更轻量,节点间不通信。
- Broker:消息存储节点,负责消息接收、存储、投递。主从架构,支持同步/异步复制。
- Producer:生产者,支持负载均衡和故障切换。发送消息时从 NameServer 获取 Broker 路由。
- Consumer:消费者,支持 Push 和 Pull 两种模式。实际 Push 模式也是长轮询实现。
部署架构:
NameServer1 NameServer2 NameServer3
\ | /
\ | /
Broker-a (Master) --- Broker-a (Slave)
Broker-b (Master) --- Broker-b (Slave)
4.2 事务消息
RocketMQ 独创的事务消息,用于解决分布式事务问题。
Half Message(半消息):
- Producer 发送 Half Message 到 Broker
- Half Message 对消费者不可见
- Producer 执行本地事务
- 根据本地事务结果提交或回滚
- 提交后消息对消费者可见
回查机制:
- 如果 Producer 执行本地事务后未返回状态(如宕机)
- Broker 定时回查 Producer 本地事务状态
- Producer 需实现
LocalTransactionChecker接口
// 发送事务消息
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
producer.setExecutorService(new ThreadPoolExecutor(...));
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
try {
doLocalTransaction();
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事务状态
return checkTransactionStatus(msg.getTransactionId());
}
});
producer.start();
4.3 顺序消息
全局顺序:
- 所有消息发送到同一个分区
- 吞吐量极低,一般不推荐
分区顺序:
- 相同业务标识的消息发送到同一分区
- 分区内保证顺序
- 消费者单线程消费该分区
// 发送顺序消息,按 orderId 哈希到分区
SendResult result = producer.send(message,
(mqs, msg, arg) -> {
Long orderId = (Long) arg;
int index = (int) (orderId % mqs.size());
return mqs.get(index);
},
orderId);
// 消费顺序消息,使用 MessageListenerOrderly
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
// 单线程消费,保证顺序
for (MessageExt msg : msgs) {
processInOrder(msg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
4.4 消息过滤
Tag 过滤:
- 生产者为消息设置 Tag
- 消费者订阅时指定感兴趣的 Tag
- 简单高效,但只支持 OR 语义
// 生产者设置 Tag
Message msg = new Message("order-topic", "created", body);
// 消费者订阅多个 Tag
consumer.subscribe("order-topic", "created || paid || shipped");
SQL92 表达式过滤:
- 支持更复杂的条件表达式
- 基于消息 Properties 进行过滤
- 需要在 Broker 开启
enablePropertyFilter=true
// 生产者设置属性
Message msg = new Message("order-topic", body);
msg.putUserProperty("amount", "1000");
msg.putUserProperty("vip", "true");
// 消费者使用 SQL92 过滤
consumer.subscribe("order-topic",
MessageSelector.bySql("amount > 500 AND vip = 'true'"));
五、消息可靠性保证
5.1 消息不丢失
消息可能丢失的环节及对策:
生产端丢失:
- 使用同步发送 + 重试机制
- 开启 Publisher Confirm(RabbitMQ)或 acks=all(Kafka)
- 记录发送日志,失败时告警
Broker 端丢失:
- 开启持久化:消息和队列都设置为持久化
- 多副本机制:至少写入 2 个节点
- 刷盘策略:同步刷盘更安全,异步刷盘性能更好
// RocketMQ 同步刷盘配置
brokerConfig.flushDiskType = FlushDiskType.SYNC_FLUSH;
消费端丢失:
- 手动 Ack,处理完成后再确认
- 异常时不自动确认,消息重新入队或进入死信队列
- 消费日志记录,便于排查
端到端可靠性方案:
- 本地事务表 + 定时补偿
- 事务消息(RocketMQ)
- 基于 Binlog 的 CDC 方案(如 Canal + Kafka)
5.2 消息幂等性
消息队列通常至少一次投递,消费者需要处理重复消息。
唯一 ID 去重:
- 为每条消息生成全局唯一 ID
- 消费者处理前检查是否已处理
// 使用 Redis 实现幂等性
String messageId = message.getUniqueKey();
Boolean isFirst = redisTemplate.opsForValue()
.setIfAbsent("mq:processed:" + messageId, "1", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(isFirst)) {
return; // 已处理,直接返回
}
processMessage(message);
去重表:
- 数据库中维护消息处理记录
- 利用唯一索引保证并发安全
CREATE TABLE message_record (
message_id VARCHAR(64) PRIMARY KEY,
status VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 插入成功说明首次处理
INSERT INTO message_record (message_id, status) VALUES (?, 'processing');
状态机:
- 业务状态只能单向流转
- 重复消息不会改变最终状态
// 订单状态机
if (order.getStatus() == OrderStatus.PAID) {
return; // 已支付,幂等处理
}
order.setStatus(OrderStatus.PAID);
orderRepository.save(order);
5.3 消息顺序性
为什么需要顺序:
- 订单创建 → 支付 → 发货,必须按序处理
- 乱序可能导致状态异常
单队列顺序:
- 生产者将关联消息发送到同一队列
- 消费者单线程消费
分区有序:
- Kafka 和 RocketMQ 保证分区内有序
- 相同 Key 的消息路由到同一分区
- 消费者组内单分区单线程消费
全局有序:
- 所有消息进入单分区/单队列
- 吞吐量受限,仅适用于低流量场景
乱序处理方案:
- 消息携带版本号,丢弃过期消息
- 消息携带时间戳,按时间排序处理
- 业务层状态机,只接受合法状态流转
5.4 消息积压处理
积压原因:
- 消费者故障
- 消费速度跟不上生产速度
- 消费逻辑异常导致大量消息进入重试队列
应急处理:
- 扩容消费者:增加消费者实例,提高消费并发
- 临时队列转发:新建队列,消费者只转发不处理,后续批量处理
- 丢弃非核心消息:降级处理,跳过非关键消息
// 临时消费者只转发
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
// 只转发到临时 Topic,不处理业务逻辑
tempProducer.send(new Message("temp-topic", msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
死信处理:
- 死信消息需要人工介入
- 分析死信原因,修复后重新消费
- 定期清理过期死信,释放存储空间
预防机制:
- 监控消费延迟,设置告警阈值
- 限流保护,消费速度下降时降低生产速度
- 定期演练,验证扩容和降级预案
六、MQ 选型对比
| 特性 | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|
| 开发语言 | Erlang | Scala/Java | Java |
| 协议支持 | AMQP、MQTT、STOMP | 自定义协议 | 自定义协议、OpenMessaging |
| 吞吐量 | 万级/秒 | 十万级/秒 | 十万级/秒 |
| 延迟 | 微秒级 | 毫秒级 | 毫秒级 |
| 消息可靠性 | 高(持久化+确认机制) | 高(多副本+ISR) | 高(同步刷盘+多副本) |
| 事务消息 | 不支持 | 支持(幂等+事务) | 支持(Half Message) |
| 顺序消息 | 不支持 | 分区有序 | 分区有序、全局顺序 |
| 消息过滤 | Exchange 路由 | 无原生支持 | Tag、SQL92 |
| 消息回溯 | 不支持 | 支持(Offset) | 支持(时间/Offset) |
| 集群模式 | 镜像队列、Quorum Queue | 分区副本、ISR | 主从同步、DLedger |
| 运维复杂度 | 中 | 高(依赖 ZK) | 中 |
| 社区活跃度 | 高 | 极高 | 高 |
| 适用场景 | 企业集成、复杂路由 | 日志采集、流处理 | 金融级可靠、订单场景 |
选型建议:
- 选择 RabbitMQ:需要复杂路由规则、企业系统集成、微服务间通信,对延迟敏感
- 选择 Kafka:日志采集、用户行为分析、流处理场景,需要极高吞吐量和消息回溯能力
- 选择 RocketMQ:金融级消息可靠性、事务消息需求、订单/支付等强一致性场景
七、学习资源推荐
官方文档:
- RabbitMQ: https://www.rabbitmq.com/documentation.html
- Kafka: https://kafka.apache.org/documentation/
- RocketMQ: https://rocketmq.apache.org/docs/
书籍推荐:
- 《RabbitMQ 实战》:适合 RabbitMQ 入门到进阶
- 《Kafka 权威指南》:全面理解 Kafka 架构和使用
- 《RocketMQ 技术内幕》:深入理解 RocketMQ 设计原理
实践建议:
- 先选择一种 MQ 深入学习,理解其设计哲学
- 搭建本地环境,完成基本的生产消费示例
- 模拟故障场景,验证可靠性保证机制
- 阅读源码,理解核心流程的实现细节
- 参与开源项目或在实际项目中应用
面试高频考点:
- 如何保证消息不丢失?
- 如何处理消息积压?
- 如何实现消息幂等性?
- 如何保证消息顺序性?
- 消息队列的选型依据是什么?
- Kafka 为什么吞吐量高?(零拷贝、顺序写、分区并行)
- RocketMQ 事务消息的实现原理?
- Kafka Rebalance 的过程和影响?