public class Producer {
public static final String QUEUE_NAME="hello";
public static final String HOST="47.99.113.73";
public static final String USER_NAME="admin";
public static final String PASSWORD="123";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
*@param mandatory true if the 'mandatory' flag is to be set
* @param immediate true if the 'immediate' flag is to be
set. Note that the RabbitMQ server does not support this flag.
@param props other properties for the message - routing headers etc
@param body the message body
* */
String message="hello world!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完毕!");
}
}
“By doing so we would simply make the program move on, close everything, and exit! This would be awkward because we want the process to stay alive while the consumer is listening asynchronously for messages to arrive.”
public class Consumer {
public static final String QUEUE_NAME="hello";
public static final String HOST="47.99.113.73";
public static final String USER_NAME="admin";
public static final String PASSWORD="123";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
Channel channel = factory.newConnection().createChannel();
/**
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param deliverCallback callback when a message is delivered
* @param shutdownSignalCallback callback when the channel/connection is shut down
* */
channel.basicConsume(QUEUE_NAME, true,
(String consumerTag, Delivery message)->{
//处理消息
System.out.println("message:"+new String(message.getBody()));
},
(String consumerTag)->{
System.out.println("消息被中断;"+consumerTag);
});
System.out.println("消息接收完毕!");
}
}
工作队列
The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.