消息队列

难度等级:⭐⭐⭐ 前置知识:Web 后端开发 后续衔接:微服务架构、事件驱动架构

学习路径


一、消息队列基础

1.1 为什么需要消息队列

消息队列(Message Queue,简称 MQ)是分布式系统中重要的基础设施,主要解决以下问题:

异步处理:在同步调用场景中,用户注册需要写入数据库、发送验证邮件、初始化用户画像,整个过程可能需要 3 秒。引入消息队列后,主流程只写入数据库(200ms),其余操作通过消息异步完成,响应时间大幅降低。

削峰填谷:电商秒杀场景下,瞬时流量可能达到每秒数万请求,直接打到数据库会导致服务崩溃。消息队列作为缓冲区,将请求暂存后以数据库能承受的速度消费,保护下游系统。

系统解耦:订单系统完成支付后,需要通知库存系统扣减、物流系统发货、积分系统增加积分。如果采用同步调用,订单系统需要知道所有下游系统的接口,耦合严重。通过消息队列,订单系统只需发送一条消息,下游系统各自订阅,新增系统无需修改订单代码。

最终一致性:在分布式系统中,跨服务的数据一致性很难通过事务保证。消息队列结合本地事务表或事务消息,可以实现跨服务的最终一致性。

1.2 核心概念

Producer(生产者):发送消息的应用。生产者将消息发送到消息中间件,不关心谁消费、何时消费。

Consumer(消费者):接收并处理消息的应用。消费者从消息中间件拉取或订阅消息进行处理。

Broker(代理):消息队列的服务端,负责接收、存储、转发消息。一个 MQ 集群通常由多个 Broker 组成。

Topic(主题):消息的逻辑分类。生产者将消息发送到指定 Topic,消费者订阅感兴趣的 Topic。

Queue(队列):消息的物理存储单元。一个 Topic 可以包含多个 Queue,用于水平扩展。

Message(消息):传输的数据单元,通常包含 Header(元数据)、Body(消息体)、Properties(自定义属性)。

Offset(偏移量):消费者在队列中的消费位置标记,用于记录已消费到哪条消息。

1.3 消息模型

点对点模型(Point-to-Point)

生产者 → [Queue] → 消费者A(消费消息1)
                → 消费者B(消费消息2)

发布订阅模型(Publish-Subscribe)

生产者 → [Topic] → 消费者组A → 消费者A1
                   → 消费者组B → 消费者B1
                                → 消费者B2

两种模型各有适用场景,现代 MQ 产品通常同时支持两种模型。


二、RabbitMQ

2.1 架构与协议

RabbitMQ 基于 AMQP(Advanced Message Queuing Protocol)协议实现,是开源消息中间件中协议支持最完善的产品。

AMQP 协议核心概念

Exchange 类型

类型 路由规则 典型场景
Direct 精确匹配 Routing Key 单播、指定消费者
Topic 通配符匹配(* 匹配一个词,# 匹配零或多个词) 多播、按规则路由
Fanout 广播到所有绑定队列 事件广播、日志分发
Headers 根据消息 Header 属性匹配 复杂条件路由
// Direct Exchange 示例
channel.exchangeDeclare("order.exchange", BuiltinExchangeType.DIRECT);
channel.queueDeclare("order.queue", true, false, false, null);
channel.queueBind("order.queue", "order.exchange", "order.created");

// 发送消息
channel.basicPublish("order.exchange", "order.created", 
    MessageProperties.PERSISTENT_TEXT_PLAIN, 
    message.getBytes());

2.2 消息确认机制

RabbitMQ 提供多层确认机制,确保消息不丢失:

Publisher Confirm(生产者确认)

// 异步 Confirm 示例
channel.confirmSelect();
channel.addConfirmListener(
    (deliveryTag, multiple) -> {
        // 消息成功投递到 Broker
        log.info("Message {} confirmed", deliveryTag);
    },
    (deliveryTag, multiple) -> {
        // 消息投递失败,需要重试
        log.error("Message {} nacked", deliveryTag);
    }
);

Consumer Ack(消费者确认)

// 手动 Ack 示例
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
        processMessage(delivery.getBody());
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    } catch (Exception e) {
        // requeue=false 时消息进入死信队列
        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
    }
};
channel.basicConsume("order.queue", false, deliverCallback, consumerTag -> {});

2.3 死信队列与延迟队列

死信队列(DLX - Dead Letter Exchange): 消息在以下情况成为死信:

  1. 消费者拒绝且 requeue=false
  2. 消息 TTL 过期
  3. 队列达到最大长度

配置死信队列:

// 主队列配置死信参数
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.routing");
channel.queueDeclare("main.queue", true, false, false, args);

// 死信队列
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routing");

延迟队列: RabbitMQ 原生不支持延迟队列,通过以下方式实现:

  1. TTL + DLX:设置消息 TTL,过期后转入死信队列,消费者从死信队列消费
  2. 延迟队列插件:安装 rabbitmq-delayed-message-exchange 插件
// 插件方式实现延迟队列
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed.exchange", "x-delayed-message", true, false, args);

// 发送延迟消息
Map<String, Object> headers = new HashMap<>();
headers.put("x-delay", 5000); // 延迟 5 秒
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .headers(headers).build();
channel.basicPublish("delayed.exchange", "delayed.routing", props, message.getBytes());

2.4 集群与高可用

普通集群模式

镜像队列(Mirrored Queue)

Quorum Queue(仲裁队列)

# 声明仲裁队列
rabbitmqadmin declare queue name=quorum.queue durable=true arguments='{"x-queue-type":"quorum"}'

三、Kafka

3.1 核心架构

Kafka 是 LinkedIn 开源的分布式流平台,设计目标是高吞吐、可扩展、持久化。

核心组件

副本机制

# 创建 Topic,3 个分区,每个分区 2 个副本
kafka-topics.sh --create --topic order-topic \
  --bootstrap-server localhost:9092 \
  --partitions 3 --replication-factor 2

3.2 生产者

分区策略

acks 配置

幂等性

事务

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("order-topic", "key", "message"));
producer.close();

3.3 消费者

Consumer Group(消费者组)

Rebalance(重平衡)

Offset 管理

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("order-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
    }
    consumer.commitSync(); // 手动提交 offset
}

3.4 存储机制

Segment 文件

日志压缩(Log Compaction)

零拷贝(Zero Copy)

磁盘 → Page Cache → Socket Buffer → 网卡
(跳过用户空间拷贝)

数据过期策略

3.5 Kafka Streams

Kafka Streams 是 Kafka 提供的轻量级流处理库,无需独立集群。

核心概念

窗口操作

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");

// 过滤 + 转换
source.filter((key, value) -> value != null)
      .mapValues(value -> value.toUpperCase())
      .to("output-topic");

// 窗口聚合
source.groupByKey()
      .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
      .count()
      .toStream()
      .to("window-count-topic");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

四、RocketMQ

4.1 架构设计

RocketMQ 是阿里巴巴开源的分布式消息中间件,后捐赠给 Apache。

核心组件

部署架构

NameServer1    NameServer2    NameServer3
      \            |            /
       \           |           /
        Broker-a (Master) --- Broker-a (Slave)
        Broker-b (Master) --- Broker-b (Slave)

4.2 事务消息

RocketMQ 独创的事务消息,用于解决分布式事务问题。

Half Message(半消息)

  1. Producer 发送 Half Message 到 Broker
  2. Half Message 对消费者不可见
  3. Producer 执行本地事务
  4. 根据本地事务结果提交或回滚
  5. 提交后消息对消费者可见

回查机制

// 发送事务消息
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
producer.setExecutorService(new ThreadPoolExecutor(...));
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        try {
            doLocalTransaction();
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 回查本地事务状态
        return checkTransactionStatus(msg.getTransactionId());
    }
});
producer.start();

4.3 顺序消息

全局顺序

分区顺序

// 发送顺序消息,按 orderId 哈希到分区
SendResult result = producer.send(message, 
    (mqs, msg, arg) -> {
        Long orderId = (Long) arg;
        int index = (int) (orderId % mqs.size());
        return mqs.get(index);
    }, 
    orderId);

// 消费顺序消息,使用 MessageListenerOrderly
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, 
                                                ConsumeOrderlyContext context) {
        // 单线程消费,保证顺序
        for (MessageExt msg : msgs) {
            processInOrder(msg);
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

4.4 消息过滤

Tag 过滤

// 生产者设置 Tag
Message msg = new Message("order-topic", "created", body);

// 消费者订阅多个 Tag
consumer.subscribe("order-topic", "created || paid || shipped");

SQL92 表达式过滤

// 生产者设置属性
Message msg = new Message("order-topic", body);
msg.putUserProperty("amount", "1000");
msg.putUserProperty("vip", "true");

// 消费者使用 SQL92 过滤
consumer.subscribe("order-topic", 
    MessageSelector.bySql("amount > 500 AND vip = 'true'"));

五、消息可靠性保证

5.1 消息不丢失

消息可能丢失的环节及对策:

生产端丢失

Broker 端丢失

// RocketMQ 同步刷盘配置
brokerConfig.flushDiskType = FlushDiskType.SYNC_FLUSH;

消费端丢失

端到端可靠性方案

  1. 本地事务表 + 定时补偿
  2. 事务消息(RocketMQ)
  3. 基于 Binlog 的 CDC 方案(如 Canal + Kafka)

5.2 消息幂等性

消息队列通常至少一次投递,消费者需要处理重复消息。

唯一 ID 去重

// 使用 Redis 实现幂等性
String messageId = message.getUniqueKey();
Boolean isFirst = redisTemplate.opsForValue()
    .setIfAbsent("mq:processed:" + messageId, "1", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(isFirst)) {
    return; // 已处理,直接返回
}
processMessage(message);

去重表

CREATE TABLE message_record (
    message_id VARCHAR(64) PRIMARY KEY,
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 插入成功说明首次处理
INSERT INTO message_record (message_id, status) VALUES (?, 'processing');

状态机

// 订单状态机
if (order.getStatus() == OrderStatus.PAID) {
    return; // 已支付,幂等处理
}
order.setStatus(OrderStatus.PAID);
orderRepository.save(order);

5.3 消息顺序性

为什么需要顺序

单队列顺序

分区有序

全局有序

乱序处理方案

5.4 消息积压处理

积压原因

应急处理

  1. 扩容消费者:增加消费者实例,提高消费并发
  2. 临时队列转发:新建队列,消费者只转发不处理,后续批量处理
  3. 丢弃非核心消息:降级处理,跳过非关键消息
// 临时消费者只转发
consumer.registerMessageListener((msgs, context) -> {
    for (MessageExt msg : msgs) {
        // 只转发到临时 Topic,不处理业务逻辑
        tempProducer.send(new Message("temp-topic", msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

死信处理

预防机制


六、MQ 选型对比

特性 RabbitMQ Kafka RocketMQ
开发语言 Erlang Scala/Java Java
协议支持 AMQP、MQTT、STOMP 自定义协议 自定义协议、OpenMessaging
吞吐量 万级/秒 十万级/秒 十万级/秒
延迟 微秒级 毫秒级 毫秒级
消息可靠性 高(持久化+确认机制) 高(多副本+ISR) 高(同步刷盘+多副本)
事务消息 不支持 支持(幂等+事务) 支持(Half Message)
顺序消息 不支持 分区有序 分区有序、全局顺序
消息过滤 Exchange 路由 无原生支持 Tag、SQL92
消息回溯 不支持 支持(Offset) 支持(时间/Offset)
集群模式 镜像队列、Quorum Queue 分区副本、ISR 主从同步、DLedger
运维复杂度 高(依赖 ZK)
社区活跃度 极高
适用场景 企业集成、复杂路由 日志采集、流处理 金融级可靠、订单场景

选型建议


七、学习资源推荐

官方文档

书籍推荐

实践建议

  1. 先选择一种 MQ 深入学习,理解其设计哲学
  2. 搭建本地环境,完成基本的生产消费示例
  3. 模拟故障场景,验证可靠性保证机制
  4. 阅读源码,理解核心流程的实现细节
  5. 参与开源项目或在实际项目中应用

面试高频考点