网址 rabbitmq官网
需要下载一个市erlang环境,因为rabbitmq是用erlang开发的。
·erlang-21.3-1.el7.x86_64.rpm
·rabbitmq-server-3.8.8-1.el7.noarch.rpm
上传到/usr/local下
安装命令:
rpm -ivh erlang-21.3-1.el7.x86_64.rpm #erlang环境 yum install socat -y #socat依赖 rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm #rabbitmq安装
一些常用命令
添加开机启动 RabbitMQ 服务 chkconfig rabbitmq-server on 启动服务 /sbin/service rabbitmq-server start 查看服务状态 /sbin/service rabbitmq-server status 停止服务(选择执行) /sbin/service rabbitmq-server stop 开启 web 管理插件 rabbitmq-plugins enable rabbitmq_management
rabbitmq有时候登录会出现用户权限问题导致无法登录
添加新用户:
创建账号 rabbitmqctl add_user admin 123 设置用户角色 rabbitmqctl set_user_tags admin administrator#超级管理员 设置用户权限 set_permissions [-p] rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" 用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 当前用户和角色 rabbitmqctl list_users 关闭应用的命令为 rabbitmqctl stop_app 清除的命令为 rabbitmqctl reset 重新启动命令为 rabbitmqctl start_app
org.apache.maven.plugins maven-compiler-plugin 8 com.rabbitmq amqp-client 5.8.0 commons-io commons-io 2.6
生产者:
public class Producer { public static final String QUEUE_NAME="hello"; public static final String HOST="47.99.113.73"; public static final String USER_NAME="admin"; public static final String PASSWORD="123"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); /** * @param exchange the exchange to publish the message to * @param routingKey the routing key *@param mandatory true if the 'mandatory' flag is to be set * @param immediate true if the 'immediate' flag is to be set. Note that the RabbitMQ server does not support this flag. @param props other properties for the message - routing headers etc @param body the message body * */ String message="hello world!"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("发送消息完毕!"); } }
需要打开服务器5672端口,否则会出现连接超时。15672和5672的区别
有几个细节:ConnectionFactory和channel实现了java.lang.AutoCloseable接口不需要手动关闭;如果队列不存在就会自己创建;有时候会出现消息生产失败也可能是内存大小问题,默认是至少有200M磁盘空闲,可以通过设置disk_free_limit来修改参数。
为什么尝试使用自动关闭资源而不是手动关闭?
“By doing so we would simply make the program move on, close everything, and exit! This would be awkward because we want the process to stay alive while the consumer is listening asynchronously for messages to arrive.”
消费者:
public class Consumer { public static final String QUEUE_NAME="hello"; public static final String HOST="47.99.113.73"; public static final String USER_NAME="admin"; public static final String PASSWORD="123"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Channel channel = factory.newConnection().createChannel(); /** * @param queue the name of the queue * @param autoAck true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param deliverCallback callback when a message is delivered * @param shutdownSignalCallback callback when the channel/connection is shut down * */ channel.basicConsume(QUEUE_NAME, true, (String consumerTag, Delivery message)->{ //处理消息 System.out.println("message:"+new String(message.getBody())); }, (String consumerTag)->{ System.out.println("消息被中断;"+consumerTag); }); System.out.println("消息接收完毕!"); } }
The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
工作队列(又称:任务队列)的主要思想是避免立即做一个资源密集型的任务,而不得不等待它完成。相反,我们将任务安排在以后完成。我们将一个任务封装为一条消息,并将其发送到一个队列中。一个在后台运行的工作者进程将弹出任务,并最终执行工作。当你运行许多工作者时,任务将在他们之间共享。
简单来说就是解决大量消息被轮流发放处理。
生产者:
/** * 工作队列生产者 * */ public class Sender { public static final String QUEUE_NAME="hello"; public static void main(String[] args) { Channel channel = ChannelUtil.getChannel(); Scanner sc = new Scanner(System.in); while (sc.hasNext()){ String next = sc.next(); try { channel.queueDeclare(QUEUE_NAME, false,false, false, null); channel.basicPublish("", QUEUE_NAME,null, next.getBytes() ); System.out.println("工作队列1已经发送消息"+next); }catch (Exception e){ System.out.println("发送异常:"+e.getMessage()); } } } }
开启两个消费者:
/** * 工作队列处理者,处理生产者产生的大量消息 * */ public class Worker { public static final String QUEUE_NAME="hello"; public static void main(String[] args) { new Thread(()->{ Channel channel = ChannelUtil.getChannel(); try { channel.basicConsume(QUEUE_NAME, false, (String consumerTag, Delivery message)->{ System.out.println("队列1:"+new String(message.getBody())); /** * @param1:消息标记,哪个消息应答了 * @param2:取消应答同一信道所有消息 * */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false ); }, (String consumerTag)->{}); }catch (Exception e){ System.out.println("消息消费异常"); } },"线程1").start(); new Thread(()->{ Channel channel = ChannelUtil.getChannel(); try { channel.basicConsume(QUEUE_NAME, false, (String consumerTag, Delivery message)->{ System.out.println("队列2:"+new String(message.getBody())); /** * @param1:消息标记,哪个消息应答了 * @param2:取消应答同一信道所有消息 * */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false ); }, (String consumerTag)->{}); }catch (Exception e){ System.out.println("消息消费异常"); } },"线程2").start(); } }
在手动应答下,当某个消费者在接收消息后出现宕机等突发情况造成消息丢失但由于,这时候rabbitmq没有接收到手动应答的信号,并没有将该消息丢弃而是转发给其他消费者。
将第二个参数durable改为true即可。
channel.queueDeclare(QUEUE_NAME, true,false, false, null); channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, next.getBytes() );
默认情况下是轮询分发,某些情况下,某些消费者处理消息比较快,使用不公平分发策略消息处理更快一点。
channel.basicQos(1);
在消费者中提前向其信道中所放的数据数量。指定消费者消费消息数量。
@param prefetchCount maximum number of messages that the server will deliver, 0 if unlimited channel.basicQos(prefetchCount);
未完…