限制消费线程
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class RocketMQConsumerExample {
public static void main(String[] args) {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("nameServerAddress");
// 设置最小消费线程数为 2
consumer.setConsumeThreadMin(2);
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 处理消息逻辑
for (MessageExt msg : msgs) {
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
// 启动消费者
consumer.start();
System.out.println("Consumer started.");
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
public class PseudoCodeExample {
public DefaultMQPushConsumer createConsumer() {
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerId");
consumer.setNamesrvAddr("namesrvAddr");
consumer.setConsumeThreadMax(10);
consumer.setConsumeThreadMin(10);
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());
return consumer;
} catch (Exception e) {
// 记录错误日志
return null;
}
}
public void startConsumer(DefaultMQPushConsumer consumer, String topic, String tag, MessageListener listener) {
if (consumer!= null) {
consumer.subscribe(topic, tag);
consumer.registerMessageListener(listener);
consumer.start();
}
}
}