您现在的位置是:亿华云 > IT科技类资讯

如何在 SpringBoot 项目中控制 RocketMQ消费线程数量

亿华云2025-10-03 06:31:49【IT科技类资讯】6人已围观

简介1 背景最近在新项目开发中遇到一个有趣的问题,如何在 SpringBoot 项目中控制 RocketMQ 消费线程数量。如何设置单个 topic 消费线程的最小数量和最大数量,用来区分不同 topic

1 背景

最近在新项目开发中遇到一个有趣的项消费线程问题,如何在 SpringBoot 项目中控制 RocketMQ 消费线程数量。目中如何设置单个 topic 消费线程的控制最小数量和最大数量,用来区分不同 topic 吞吐量不同。数量

我们先介绍一下 RocketMQ 消息监听再来说明 RocketMQ 消费线程。项消费线程

2 RocketMQ 消息监听

设置消费者组为 my_consumer_group,目中监听 TopicTest 队列,控制并使用并发消息监听器MessageListenerConcurrently

1public class Consumer {

2

3 public static void main(String[] args) throws InterruptedException,数量 MQClientException {

4 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");

5 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

6 consumer.subscribe("TopicTest", "*");

7 consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");

8 consumer.registerMessageListener(new MessageListenerConcurrently() {

9 @Override

10 public ConsumeConcurrentlyStatus consumeMessage(Listmsgs,

11 ConsumeConcurrentlyContext context) {

12 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

13 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

14 }

15 });

16 consumer.start();

17 System.out.printf("Consumer Started.%n");

18 }

19}

3 RocketMQ 中连接结构图

4 消费监听器

接口:org.apache.rocketmq.client.consumer.listener.MessageListener

有两个子接口:

- 顺序消费:MessageListenerOrderly

- 并发消费: MessageListenerConcurrently

4.1 MessageListenerConcurrently

作用:consumer并发消费消息的监听器

比如,在 quick start 中,项消费线程就是目中使用的并发消费消息监听器:​

1 consumer.registerMessageListener(new MessageListenerConcurrently() {

2 @Override

3 public ConsumeConcurrentlyStatus consumeMessage(Listmsgs,

4 ConsumeConcurrentlyContext context) {

5 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

6 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

7 }

8 });

方法返回值,是控制个枚举:

1 package org.apache.rocketmq.client.consumer.listener;

2

3/

**

4 * 并发消费mq消息结果

5 */

6public enum ConsumeConcurrentlyStatus {

7

8 /

**

9 * Success consumption

10 * 成功消费

11 */

12 CONSUME_SUCCESS,

13

14 /

**

15 * Failure consumption,later try to consume

16 * 失败消费,稍后尝试消费

17

*

18

*

19 * 如果 { @link MessageListener}返回的数量消费结果为 RECONSUME_LATER,则需要将这些消息发送给Broker延迟消息。

20 * 如果给broker发送消息失败,项消费线程将延迟5s后提交线程池进行消费。

21

*

22 * RECONSUME_LATER的目中消息发送入口: MQClientAPIImpl#consumerSendMessageBack,

23 * 命令编码: { @link org.apache.rocketmq.common.protocol.RequestCode#CONSUMER_SEND_MSG_BACK}

24 */

25 RECONSUME_LATER;

26}

画外音:

当前,我们在具体开发中,源码库控制肯定不会直接使用这种方式来写consumer。

常用的Consumer实现是:基于 推 的consumer:DefaultMQPushConsumer

4.2 MessageListenerOrderly

作用:consumer顺序消费消息的监听器

5 消费线程池

5.1 DefaultMQPushConsumer

作用:基于 推 的consumer消费者

5.2 注册并发消息监听器

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#registerMessageListener

当使用这个方法注册消息监听器时,实际上会把这个病发消息监听器设置到 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#messageListenerInner属性中。

5.3 设置 consumer 消费 service

可选有两种:​

并发消费的service

顺序消费的service

当consumer在启动的时,会使用MessageListener具体实现类型进行判断:

MessageListener 就有并发和顺序两种,所以service也有两种。

1public synchronized void start() throws MQClientException {

2 switch (this.serviceState) {

3 case CREATE_JUST:

4

5 // 省略一部分代码...........

6

7 // 根据注册的监听器类型[并发消息监听器/顺序执行消息监听器],来确定使用哪种消费服务.

8 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {

9 this.consumeOrderly = true;

10 this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());

11 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {

12 this.consumeOrderly = false;

13 this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());

14 }

15 this.consumeMessageService.start();

16

17 // 省略一部分代码..........

18 this.serviceState = ServiceState.RUNNING;

19 break;

20 case RUNNING:

21 case START_FAILED:

22 case SHUTDOWN_ALREADY:

23 throw new MQClientException("The PushConsumer service state not OK, maybe started once");

24 default:

25 break;

26 }

27

28 // 省略一部分代码..........

29 }

如果使用的是并发消费的话,使用 ConsumeMessageConcurrentlyService :

在实例化的时候,会创建一个线程池:

1// 无界队列,并且不可配置容量.那 DefaultMQPushConsumer#consumeThreadMax 配置就毫无意义了.

2this.consumeRequestQueue = new LinkedBlockingQueue();

3this.consumeExecutor = new ThreadPoolExecutor(

4 this.defaultMQPushConsumer.getConsumeThreadMin(), // 默认20

5 this.defaultMQPushConsumer.getConsumeThreadMax(), // 默认64

6 1000 * 60,

7 TimeUnit.MILLISECONDS,

8 this.consumeRequestQueue,

9 new ThreadFactoryImpl("ConsumeMessageThread_"));

consumer消费线程池参数:

默认最小消费线程数 20默认最大消费线程数 64keepAliveTime = 60*1000      单位:秒队列:new LinkedBlockingQueue<>()​ 无界队列线程名称:前缀是站群服务器:ConsumeMessageThread_

注意:因为线程池使用的是无界队列,那么设置的最大线程数,其实没有什么意义。

5.4 修改线程池线程数

上面我们已经知道了,设置线程池的最大线程数是没什么用的。

那我们其实可以设置线程池的最小线程数,来修改consumer消费消息时的线程池大小。

1public static void main(String[] args) throws InterruptedException, MQClientException {

2 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

3

4 consumer.setConsumeThreadMin(30);

5 consumer.setConsumeThreadMax(64);

6

7 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

8 consumer.subscribe("TopicTest", "*");

9 consumer.registerMessageListener(new MessageListenerConcurrently() {

10

11 @Override

12 public ConsumeConcurrentlyStatus consumeMessage(Listmsgs,

13 ConsumeConcurrentlyContext context) {

14 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

15 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

16 }

17 });

18 consumer.start();

19 System.out.printf("Consumer Started.%n");

20 }

注意:consumeThreadMin​ 如果大于64,则也需要设置 consumeThreadMax 参数,因为有个校验:

-修改线程池线程数-SpringBoot版

如果consumer是使用spring boot进行集成的,则可以这样设置消费者线程数:

很赞哦!(74421)