限制消费线程

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class RocketMQConsumerExample {
    public static void main(String[] args) {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        consumer.setNamesrvAddr("nameServerAddress");

        // 设置最小消费线程数为 2
        consumer.setConsumeThreadMin(2);

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // 处理消息逻辑
                for (MessageExt msg : msgs) {
                    System.out.println("Received message: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        try {
            // 启动消费者
            consumer.start();
            System.out.println("Consumer started.");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

public class PseudoCodeExample {
    public DefaultMQPushConsumer createConsumer() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerId");
            consumer.setNamesrvAddr("namesrvAddr");
            consumer.setConsumeThreadMax(10);
            consumer.setConsumeThreadMin(10);
            consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());
            return consumer;
        } catch (Exception e) {
            // 记录错误日志
            return null;
        }
    }

    public void startConsumer(DefaultMQPushConsumer consumer, String topic, String tag, MessageListener listener) {
        if (consumer!= null) {
            consumer.subscribe(topic, tag);
            consumer.registerMessageListener(listener);
            consumer.start();
        }
    }
}