Linux下RabbitMQ
作者:mmseoamin日期:2023-12-21

安装

网址 rabbitmq官网

需要下载一个市erlang环境,因为rabbitmq是用erlang开发的。

·erlang-21.3-1.el7.x86_64.rpm

·rabbitmq-server-3.8.8-1.el7.noarch.rpm

上传

上传到/usr/local下

安装命令:

rpm -ivh erlang-21.3-1.el7.x86_64.rpm #erlang环境
yum install socat -y #socat依赖
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm #rabbitmq安装

一些常用命令

添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
启动服务
/sbin/service rabbitmq-server start
查看服务状态
/sbin/service rabbitmq-server status
停止服务(选择执行)
/sbin/service rabbitmq-server stop
开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management

rabbitmq有时候登录会出现用户权限问题导致无法登录

添加新用户:

创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator#超级管理员
设置用户权限
set_permissions [-p ]    
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
当前用户和角色
rabbitmqctl list_users
关闭应用的命令为
rabbitmqctl stop_app
清除的命令为
rabbitmqctl reset
重新启动命令为
rabbitmqctl start_app

依赖引入:





org.apache.maven.plugins
maven-compiler-plugin

8
8







com.rabbitmq
amqp-client
5.8.0



commons-io
commons-io
2.6


测试代码

生产者:

public class Producer {
    public static final String QUEUE_NAME="hello";
    public static final String HOST="47.99.113.73";
    public static final String USER_NAME="admin";
    public static final String PASSWORD="123";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /**
         * @param queue the name of the queue
         * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
         * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
         * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
         * @param arguments other properties (construction arguments) for the queue
         * */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         /**
         * @param exchange the exchange to publish the message to
         * @param routingKey the routing key
          *@param mandatory true if the 'mandatory' flag is to be set
        *  @param immediate true if the 'immediate' flag is to be
                 set. Note that the RabbitMQ server does not support this flag.
          @param props other properties for the message - routing headers etc
          @param body the message body
                 * */
         String message="hello world!";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("发送消息完毕!");
    }
}

需要打开服务器5672端口,否则会出现连接超时。15672和5672的区别

有几个细节:ConnectionFactory和channel实现了java.lang.AutoCloseable接口不需要手动关闭;如果队列不存在就会自己创建;有时候会出现消息生产失败也可能是内存大小问题,默认是至少有200M磁盘空闲,可以通过设置disk_free_limit来修改参数。

为什么尝试使用自动关闭资源而不是手动关闭?

“By doing so we would simply make the program move on, close everything, and exit! This would be awkward because we want the process to stay alive while the consumer is listening asynchronously for messages to arrive.”

消费者

public class Consumer {
    public static final String QUEUE_NAME="hello";
    public static final String HOST="47.99.113.73";
    public static final String USER_NAME="admin";
    public static final String PASSWORD="123";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);
        Channel channel = factory.newConnection().createChannel();
        /**
         * @param queue the name of the queue
         * @param autoAck true if the server should consider messages
         *      acknowledged once delivered; false if the server should expect
         *      explicit acknowledgements
         * @param deliverCallback callback when a message is delivered
         * @param shutdownSignalCallback callback when the channel/connection is shut down
         * */
        channel.basicConsume(QUEUE_NAME, true,
                (String consumerTag, Delivery message)->{
                    //处理消息
                    System.out.println("message:"+new String(message.getBody()));
                },
                (String consumerTag)->{
                    System.out.println("消息被中断;"+consumerTag);
                });
        System.out.println("消息接收完毕!");
    }
}

工作队列

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.

工作队列(又称:任务队列)的主要思想是避免立即做一个资源密集型的任务,而不得不等待它完成。相反,我们将任务安排在以后完成。我们将一个任务封装为一条消息,并将其发送到一个队列中。一个在后台运行的工作者进程将弹出任务,并最终执行工作。当你运行许多工作者时,任务将在他们之间共享。

简单来说就是解决大量消息被轮流发放处理。

生产者:

/**
 * 工作队列生产者
 * */
public class Sender {
    public static final String QUEUE_NAME="hello";
    public static void main(String[] args) {
        Channel channel = ChannelUtil.getChannel();
        Scanner sc = new Scanner(System.in);
        while (sc.hasNext()){
            String next = sc.next();
            try {
                channel.queueDeclare(QUEUE_NAME, false,false, false, null);
                channel.basicPublish("", QUEUE_NAME,null, next.getBytes() );
                System.out.println("工作队列1已经发送消息"+next);
            }catch (Exception e){
                System.out.println("发送异常:"+e.getMessage());
            }
        }
    }
}

开启两个消费者:

/**
 * 工作队列处理者,处理生产者产生的大量消息
 * */
public class Worker {
    public static final String QUEUE_NAME="hello";
    public static void main(String[] args) {
        new Thread(()->{
            Channel channel = ChannelUtil.getChannel();
            try {
                channel.basicConsume(QUEUE_NAME, false,
                        (String consumerTag, Delivery message)->{
                            System.out.println("队列1:"+new String(message.getBody()));
                            /**
                             * @param1:消息标记,哪个消息应答了
                             * @param2:取消应答同一信道所有消息
                             * */
                            channel.basicAck(message.getEnvelope().getDeliveryTag(),false );
                        },
                        (String consumerTag)->{});
            }catch (Exception e){
                System.out.println("消息消费异常");
            }
        },"线程1").start();
        new Thread(()->{
            Channel channel = ChannelUtil.getChannel();
            try {
                channel.basicConsume(QUEUE_NAME, false,
                        (String consumerTag, Delivery message)->{
                            System.out.println("队列2:"+new String(message.getBody()));
                            /**
                             * @param1:消息标记,哪个消息应答了
                             * @param2:取消应答同一信道所有消息
                             * */
                            channel.basicAck(message.getEnvelope().getDeliveryTag(),false );
                        },
                        (String consumerTag)->{});
            }catch (Exception e){
                System.out.println("消息消费异常");
            }
        },"线程2").start();
    }
}

rabbitmq消息重新分发机制

在手动应答下,当某个消费者在接收消息后出现宕机等突发情况造成消息丢失但由于,这时候rabbitmq没有接收到手动应答的信号,并没有将该消息丢弃而是转发给其他消费者。

rabbitmq消息持久化

将第二个参数durable改为true即可。

channel.queueDeclare(QUEUE_NAME, true,false, false, null);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, next.getBytes() );

不公平分发

默认情况下是轮询分发,某些情况下,某些消费者处理消息比较快,使用不公平分发策略消息处理更快一点。

channel.basicQos(1);

预取值

在消费者中提前向其信道中所放的数据数量。指定消费者消费消息数量。

@param prefetchCount maximum number of messages that the server
 will deliver, 0 if unlimited
channel.basicQos(prefetchCount);

未完…