官网地址: https://www.rabbitmq.com/#getstarted

1 下载安装

1.1 安装 Erlang

Erlang官网: https://www.erlang.org/downloads

RabbitMQ 是由 Erlang 语言编写的, 需要先下载,本示例采用19.3

# 安装依赖模块
[root@study opt]# yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
[root@study opt]# yum install ncurses-devel
# 解压缩与配置
[root@study opt]# cd /opt
[root@study opt]# tar zxvf otp_src_19.3.tar.gz
[root@study opt]# cd otp_src_19.3
[root@study otp_src_19.3]# ./configure --prefix=/opt/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac

# 安装 erlang
[root@study otp_src_19.3]# make && make install

# 配置环境变量
[root@study ~]# vim /etc/profile
ERLANG_HOME=/opt/erlang
export PATH=$PATH:$ERLANG_HOME/bin
export ERLANG_HOME
[root@study ~]# source /etc/profile

# 验证是否安装成功
[root@study otp_src_19.3]# erl
Erlang/OTP 19 [erts-8.3] [source] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]

1.2 安装 RabbitMQ

使用 3.6.15 版本

[root@study opt]# cd /opt
[root@study opt]# tar -xvJf rabbitmq-server-generic-unix-3.6.15.tar.xz
[root@study opt]# mv rabbitmq_server-3.6.15/ rabbitmq

# 配置环境变量
[root@study opt]# vim /etc/profile
export PATH=$PATH:/opt/rabbitmq/sbin
export RABBITMQ_HOME=/opt/rabbitmq
[root@study ~]# source /etc/profile

# 使用 -detached 参数是为了让 RabbitMQ 以守护进程方式在后台运行
[root@study ~]# rabbitmq-server -detached
Warning: PID file not written; -detached was passed.

# 查看 RabbitMQ 状态
[root@study ~]# rabbitmqctl status
Status of node rabbit@study
[{pid,25358},
 {running_applications,
     [{rabbit,"RabbitMQ","3.6.15"},
      {mnesia,"MNESIA  CXC 138 12","4.14.3"},
      {os_mon,"CPO  CXC 138 46","2.4.2"},
      {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.6.15"},
      {syntax_tools,"Syntax tools","2.1.1"},
      {ranch,"Socket acceptor pool for TCP protocols.","1.3.2"},
      {ssl,"Erlang/OTP SSL application","8.1.1"},
      {public_key,"Public key infrastructure","1.4"},
      {crypto,"CRYPTO","3.7.3"},
      {asn1,"The Erlang ASN1 compiler version 4.0.4","4.0.4"},
      {compiler,"ERTS  CXC 138 10","7.0.4"},
      {xmerl,"XML parser","1.3.13"},
      {recon,"Diagnostic tools for production use","2.3.2"},
      {sasl,"SASL  CXC 138 11","3.0.3"},
      {stdlib,"ERTS  CXC 138 10","3.3"},
      {kernel,"ERTS  CXC 138 10","5.2"}]},
 {os,{unix,linux}},
...
# 可以看到启动成功了

1.3 新增账号

默认情况下 RabbitMQ 有一个 guest 账户,只允许通过localhost访问,远程网络访问受限。所以需要新添加一个账户

# 创建 admin 用户,密码为 123456
[root@study ~]# rabbitmqctl add_user admin 123456
Creating user "admin"

# 设置用户拥有所有权限
[root@study ~]# rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/"

# 设置 admin 用户为 管理员角色
[root@study ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator]

2 使用方式

2.1 基础使用

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.2.1</version>
</dependency>
/**
 * 生产者
 */
public class RabbitProducer {
    private static final String EXCHANGE_NAME = "exchange_demo";
    private static final String ROUTING_KEY = "routingky_demo";
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "192.168.52.52";
    // rabbitMq 服务端默认端口为 5672
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("admin");
        factory.setPassword("123456");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 创建一个 type=direct 持久化、非自动删除的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
        // 创建一个:持久化、非排他的、非自动删除的队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 将交换器与队列通过 路由键 绑定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME,
                ROUTING_KEY,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());

        // 关闭资源
        channel.close();
        connection.close();
    }
}
package cn.mrcode.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 */
public class RabbitConsumer {
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "192.168.52.52";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        final Address[] addresses = {
                new Address(IP_ADDRESS, PORT)
        };
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("admin");
        factory.setPassword("123456");

        // 这里的联机方式与生产者的 demo 略有不同
        final Connection connection = factory.newConnection(addresses);
        final Channel channel = connection.createChannel();
        // 设置客户端最多接收未被 ack 的消息个数
        channel.basicQos(64);
        channel.basicConsume(QUEUE_NAME, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("recv message: " + new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

        // 等待消费者回调后,关闭资源
        TimeUnit.SECONDS.sleep(10);
        channel.close();
        connection.close();
    }
}

2.2 springboot的start方式使用