“RabbitMQ:Java(工作模式)”的版本间差异
		
		
		
		
		
		跳到导航
		跳到搜索
		
				
		
		
	
 (→关键代码)  | 
				|||
| 第145行: | 第145行: | ||
== Hello World ==  | == Hello World ==  | ||
# 生产者:  | |||
#: <syntaxhighlight lang="Java" highlight="">  | |||
package top.codexu.rabbitmq.hello;  | |||
import com.rabbitmq.client.Channel;  | |||
import com.rabbitmq.client.Connection;  | |||
import com.rabbitmq.client.ConnectionFactory;  | |||
import java.nio.charset.StandardCharsets;  | |||
public class Send {  | |||
    private final static String QUEUE_NAME = "hello";  | |||
    public static void main(String[] argv) throws Exception {  | |||
        ConnectionFactory factory = new ConnectionFactory();  | |||
        factory.setHost("localhost");  | |||
        try (Connection connection = factory.newConnection();  | |||
            Channel channel = connection.createChannel()) {  | |||
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);  | |||
            String message = "Hello World!";  | |||
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));  | |||
            System.out.println(" [x] 发送 '" + message + "'");  | |||
        }  | |||
    }  | |||
}  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
#* 可以使用 try-with-resource 语句,不必再显式地关闭资源;(Connection 和 Channel 都实现 java.io.Closeable)  | |||
# 消费者:  | |||
#: <syntaxhighlight lang="Java" highlight="">  | |||
package top.codexu.rabbitmq.hello;  | |||
import com.rabbitmq.client.Channel;  | |||
import com.rabbitmq.client.Connection;  | |||
import com.rabbitmq.client.ConnectionFactory;  | |||
import com.rabbitmq.client.DeliverCallback;  | |||
public class Recv {  | |||
    private final static String QUEUE_NAME = "hello";  | |||
    public static void main(String[] argv) throws Exception {  | |||
        ConnectionFactory factory = new ConnectionFactory();  | |||
        factory.setHost("localhost");  | |||
        Connection connection = factory.newConnection();  | |||
        Channel channel = connection.createChannel();  | |||
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  | |||
        System.out.println(" [*] 等待消息.");  | |||
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {  | |||
            String message = new String(delivery.getBody(), "UTF-8");  | |||
            System.out.println(" [x] 接收 '" + message + "'");  | |||
        };  | |||
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });  | |||
    }  | |||
}  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
#* 在这里也声明了队列:因为我们可能会在发布服务器之前启动使用者,所以我们希望在尝试'''使用来自队列的消息之前确保该队列存在'''。  | |||
#* 不使用 try-with-resource 语句自动关闭通道和连接:因为我们希望在消费者异步侦听消息到达时,该过程保持活动状态。  | |||
== Work模式 ==  | |||
<pre>  | |||
工作队列是啥?  | |||
<  | 就是当遇到了运行耗时久的任务,并且还得等待它完成,这个时候就可以使用工作队列,把这个耗时任务发送给别的工人(消费者)进行处理,生产者可以直接得到处理完的情况。  | ||
</pre>  | |||
相关特性:  | |||
# '''消息确认''':消费者发送通知告诉 MQ 处理完成了;  | |||
# '''消息持久''':遇到了宕机后,会把消息给缓存或保存下来,使得下次启动能够不丢失(但不是百分百);  | |||
# '''循环调度''':若开着两个或三个消费者的时候,当多个消息要接收,MQ 是会自动循环找下一个,避免一直重复同一个或几个;  | |||
# '''公平派遣''':当有两个消费者的时候,若一个消费者一直再累死累活,另外一个逍遥自在,这是不利于效率提升的,故可以通过设置,限制若A忙就找B去。  | |||
# 生产者:  | |||
#: <syntaxhighlight lang="Java" highlight="">  | |||
package top.codexu.rabbitmq.work;  | |||
import com.rabbitmq.client.Channel;  | |||
import com.rabbitmq.client.Connection;  | |||
import com.rabbitmq.client.ConnectionFactory;  | |||
import com.rabbitmq.client.MessageProperties;  | |||
import java.nio.charset.StandardCharsets;  | |||
public class NewTask {  | |||
    private final static String TASK_QUEUE_NAME = "task_queue";  | |||
    public static void main(String[] argv) throws Exception {  | |||
        ConnectionFactory factory = new ConnectionFactory();  | |||
        factory.setHost("localhost");  | |||
        try (Connection connection = factory.newConnection();  | |||
            Channel channel = connection.createChannel()) {  | |||
            // 消息持久化(非百分百保证)。前提:队列第一次声明就是持久的,并且在生产者端和消费者端都要声明为true。  | |||
            boolean durable = true;  | |||
            channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);  | |||
            for (int i = 0; i <= 5; i++) {  | |||
                String message = "第" + i + "个消息.";  | |||
                // MessageProperties.PERSISTENT_TEXT_PLAIN:将消息标记为持久性,确保即使 RabbitMQ 重新启动,task_queue 队列也不会丢失。  | |||
                channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));  | |||
                System.out.println(" [x] 发送 '" + message + "'");  | |||
            }  | |||
        }  | |||
    }  | |||
}  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
#* 将消息标记为持久性并'''不能完全保证消息不会丢失''':  | |||
#*: 尽管它告诉 RabbitMQ 将消息保存到磁盘,但 RabbitMQ 接受消息但尚未保存消息的时间窗口仍然很短。而且,RabbitMQ 并不是对每条消息都执行“fsync(2)”:它可能只是保存到缓存中,而不是真正写入磁盘。  | |||
#*: 持久性保证不强,但对于简单任务队列来说已经足够了。如果你需要一个更强有力的保证,那么你可以使用“'''publisher confirms'''”。  | |||
# 消费者:  | |||
#: <syntaxhighlight lang="Java" highlight="">  | |||
package top.codexu.rabbitmq.work;  | |||
import com.rabbitmq.client.Channel;  | |||
import com.rabbitmq.client.Connection;  | |||
import com.rabbitmq.client.ConnectionFactory;  | |||
import com.rabbitmq.client.DeliverCallback;  | |||
public class Worker {  | |||
    private final static String TASK_QUEUE_NAME = "task_queue";  | |||
    public static void main(String[] argv) throws Exception {  | |||
        ConnectionFactory factory = new ConnectionFactory();  | |||
        factory.setHost("localhost");  | |||
        final Connection connection = factory.newConnection();  | |||
        final Channel channel = connection.createChannel();  | |||
        // 消息持久化(非百分百保证)。消费者端同发布者一样设置为 true  | |||
        boolean durable = true;  | |||
        channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);  | |||
        System.out.println(" [*] 等待消息.");  | |||
        // 公平派遣设置:一次仅接受一条未经确认的消息,为了实现公平的安排(其他人闲着就安排其他人)  | |||
        channel.basicQos(1);  | |||
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {  | |||
            String message = new String(delivery.getBody(), "UTF-8");  | |||
            System.out.println(" [x] 接收 '" + message + "'");  | |||
            try {  | |||
                doWork(message);  | |||
            } catch (InterruptedException e) {  | |||
                e.getMessage();  | |||
            } finally {  | |||
                System.out.println(" [x] 结束");  | |||
                // 执行完成,返回确认消息。  | |||
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  | |||
            }  | |||
        };  | |||
        // 消息确认:目的就是为了不丢失消息,使得确保每个消息有执行成功。(网络抖动也不怕,会找别人执行)  | |||
        boolean autoAck = false;  | |||
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });  | |||
    }  | |||
    private static void doWork(String task) throws InterruptedException {  | |||
        for (char ch : task.toCharArray()) {  | |||
            if (ch == '.') Thread.sleep(1000);  | |||
        }  | |||
    }  | |||
}  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
<syntaxhighlight lang="  | RabbitMQ 查看当前队列有那些消息还未确定:  | ||
<syntaxhighlight lang="bash" highlight="">  | |||
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged  | |||
Timeout: 60.0 seconds ...  | |||
Listing queues for vhost / ...  | |||
name    messages_ready  messages_unacknowledged  | |||
work    0       3  | |||
hello   0       0  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
(messages_ready:等待接收的消息;messages_unacknowledged:已接受但未确认的消息)  | |||
== 发布订阅 ==  | |||
fanout 交换非常简单,它只是'''将接收到的所有消息广播到它知道的所有队列'''。  | |||
不同于以上使用具有特定名称的队列(在生产者和消费者之间共享队列时,为队列指定名称非常重要),发布订阅模式中,我们希望听到所有日志消息,而不仅仅是其中的一个子集。我们也只对当前的流消息感兴趣,而不是对旧消息感兴趣。要解决这个问题,我们就需要'''临时队列''':  | |||
# 一个新的、空的,随机名称的队列;  | |||
# 一旦断开连接,队列就被自动删除。  | |||
在 Java客户机中,当我们不向“queueDeclare()”提供任何参数时,将创建一个非持久的、独占的、自动删除的队列,并使用生成的名称:  | |||
<syntaxhighlight lang="Java" highlight="">  | <syntaxhighlight lang="Java" highlight="">  | ||
String queueName = channel.queueDeclare().getQueue();   // queueName包含一个随机队列名称,如:amq.gen-JzTY20BRgKO-hjmuj0wlg  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
<syntaxhighlight lang="Java" highlight="">  | # 生产者:  | ||
#: <syntaxhighlight lang="Java" highlight="">  | |||
package top.codexu.rabbitmq.publicsubscribe;  | |||
import com.rabbitmq.client.Channel;  | |||
import com.rabbitmq.client.Connection;  | |||
import com.rabbitmq.client.ConnectionFactory;  | |||
public class EmitLog {  | |||
    private static final String EXCHANGE_NAME = "logs";  | |||
    public static void main(String[] argv) throws Exception {  | |||
        ConnectionFactory factory = new ConnectionFactory();  | |||
        factory.setHost("localhost");  | |||
        try (Connection connection = factory.newConnection();  | |||
            Channel channel = connection.createChannel()) {  | |||
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);  | |||
            String message = "info: Hello World!";  | |||
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));  | |||
            System.out.println(" [x] 发送 '" + message + "'");  | |||
        }  | |||
    }  | |||
}  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
# 消费者:  | |||
#: <syntaxhighlight lang="Java" highlight="">  | |||
package top.codexu.rabbitmq.publicsubscribe;  | |||
import com.rabbitmq.client.Channel;  | |||
import com.rabbitmq.client.Connection;  | |||
import com.rabbitmq.client.ConnectionFactory;  | |||
import com.rabbitmq.client.DeliverCallback;  | |||
public class ReceiveLogs {  | |||
    private static final String EXCHANGE_NAME = "logs";  | |||
    public static void main(String[] argv) throws Exception {  | |||
        ConnectionFactory factory = new ConnectionFactory();  | |||
        factory.setHost("localhost");  | |||
        Connection connection = factory.newConnection();  | |||
        Channel channel = connection.createChannel();  | |||
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  | |||
        String queueName = channel.queueDeclare().getQueue();  | |||
        channel.queueBind(queueName, EXCHANGE_NAME, "");  | |||
        System.out.println(" [*] 等待消息.");  | |||
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {  | |||
            String message = new String(delivery.getBody(), "UTF-8");  | |||
            System.out.println(" [x] 接收 '" + message + "'");  | |||
        };  | |||
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });  | |||
    }  | |||
}  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
<syntaxhighlight lang="  | RabbitMQ 查看绑定关系:  | ||
<syntaxhighlight lang="bash" highlight="">  | |||
rabbitmqctl list_bindings  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
== 路由模式 ==  | |||
# 生产者:  | |||
#: <syntaxhighlight lang="Java" highlight="">  | |||
package top.codexu.rabbitmq.routing;  | |||
import com.rabbitmq.client.BuiltinExchangeType;  | |||
import com.rabbitmq.client.Channel;  | |||
import com.rabbitmq.client.Connection;  | |||
import com.rabbitmq.client.ConnectionFactory;  | |||
public class EmitLogDirect {  | |||
    private static final String EXCHANGE_NAME = "direct_logs";  | |||
    public static void main(String[] argv) throws Exception {  | |||
        ConnectionFactory factory = new ConnectionFactory();  | |||
        factory.setHost("localhost");  | |||
        try (Connection connection = factory.newConnection();  | |||
             Channel channel = connection.createChannel()) {  | |||
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);  | |||
            String severity = "warning";  | |||
            String message = "Hello World!";  | |||
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));  | |||
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");  | |||
        }  | |||
    }  | |||
}  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
# 消费者:  | |||
#: <syntaxhighlight lang="Java" highlight="">  | |||
package top.codexu.rabbitmq.routing;  | |||
import com.rabbitmq.client.*;  | |||
public class ReceiveLogsDirect {  | |||
    private static final String EXCHANGE_NAME = "direct_logs";  | |||
    public static void main(String[] argv) throws Exception {  | |||
        ConnectionFactory factory = new ConnectionFactory();  | |||
        factory.setHost("localhost");  | |||
        Connection connection = factory.newConnection();  | |||
        Channel channel = connection.createChannel();  | |||
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);  | |||
        String queueName1 = channel.queueDeclare().getQueue();  | |||
        String queueName2 = channel.queueDeclare().getQueue();  | |||
        String[] severities;  | |||
        severities = {"info", "warning", "error"};  | |||
        for (String severity : severities1) {  | |||
            channel.queueBind(queueName1, EXCHANGE_NAME, severity);  | |||
        }  | |||
        severities = {"info"};  | |||
        for (String severity : severities2) {  | |||
            channel.queueBind(queueName2, EXCHANGE_NAME, severity);  | |||
        }  | |||
        System.out.println(" [*] 等待消息.");  | |||
        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {  | |||
            String message = new String(delivery.getBody(), "UTF-8");  | |||
            System.out.println(" [x] 队列1接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");  | |||
        };  | |||
        channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });  | |||
        DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {  | |||
            String message = new String(delivery.getBody(), "UTF-8");  | |||
            System.out.println(" [x] 队列2接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");  | |||
        };  | |||
        channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });  | |||
    }  | |||
}  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
#* 如上,一个 queue 可以绑定多个 routingKey。  | |||
== 主题模式 ==  | |||
# 生产者:  | |||
#: <syntaxhighlight lang="Java" highlight="">  | |||
import com.rabbitmq.client.Channel;  | |||
import com.rabbitmq.client.Connection;  | |||
import com.rabbitmq.client.ConnectionFactory;  | |||
public class EmitLogTopic {  | |||
    private static final String EXCHANGE_NAME = "topic_logs";  | |||
    public static void main(String[] argv) throws Exception {  | |||
        ConnectionFactory factory = new ConnectionFactory();  | |||
        factory.setHost("localhost");  | |||
        try (Connection connection = factory.newConnection();  | |||
            Channel channel = connection.createChannel()) {  | |||
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);  | |||
            String[] routingKeys = new String[]{"quick.orange.rabbit",   | |||
												"lazy.orange.elephant",   | |||
												"quick.orange.fox",   | |||
												"lazy.brown.fox",   | |||
												"quick.brown.fox",   | |||
												"quick.orange.male.rabbit",   | |||
												"lazy.orange.male.rabbit"};  | |||
            for(String severity : routingKeys){  | |||
                String message = "From " + severity + " routingKey' s message!";  | |||
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));  | |||
                System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");  | |||
            }  | |||
        }  | |||
    }  | |||
    //..  | |||
}  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
# 消费者:  | |||
#: <syntaxhighlight lang="Java" highlight="">  | |||
import com.rabbitmq.client.Channel;  | |||
import com.rabbitmq.client.Connection;  | |||
import com.rabbitmq.client.ConnectionFactory;  | |||
import com.rabbitmq.client.DeliverCallback;  | |||
public class ReceiveLogsTopic {  | |||
    private static final String EXCHANGE_NAME = "topic_logs";  | |||
    public static void main(String[] argv) throws Exception {  | |||
        ConnectionFactory factory = new ConnectionFactory();  | |||
        factory.setHost("localhost");  | |||
        Connection connection = factory.newConnection();  | |||
        Channel channel = connection.createChannel();  | |||
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);  | |||
        String queueName1 = channel.queueDeclare().getQueue();  | |||
        String queueName2 = channel.queueDeclare().getQueue();  | |||
        String[] routingKeys;  | |||
        routingKeys = new String[]{"*.orange.*"};  | |||
        for (String bindingKey : routingKeys) {  | |||
            channel.queueBind(queueName1, EXCHANGE_NAME, bindingKey);  | |||
            System.out.println("ReceiveLogsTopic2 exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + bindingKey);  | |||
        }  | |||
        routingKeys = new String[]{"*.*.rabbit", "lazy.#"};  | |||
        for (String bindingKey : routingKeys) {  | |||
            channel.queueBind(queueName2, EXCHANGE_NAME, bindingKey);  | |||
            System.out.println("ReceiveLogsTopic2 exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + bindingKey);  | |||
        }  | |||
        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {  | |||
            String message = new String(delivery.getBody(), "UTF-8");  | |||
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");  | |||
        };  | |||
        channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });  | |||
        DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {  | |||
            String message = new String(delivery.getBody(), "UTF-8");  | |||
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");  | |||
        };  | |||
        channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });  | |||
    }  | |||
}  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
== 标题文字 ==  | |||
<syntaxhighlight lang="Java" highlight="">  | # 生产者:  | ||
#: <syntaxhighlight lang="Java" highlight="">  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
# 消费者:  | |||
#: <syntaxhighlight lang="Java" highlight="">  | |||
<syntaxhighlight lang="Java" highlight="">  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
<syntaxhighlight lang="Java" highlight="">  | # 生产者:  | ||
#: <syntaxhighlight lang="Java" highlight="">  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
# 消费者:  | |||
#: <syntaxhighlight lang="Java" highlight="">  | |||
<syntaxhighlight lang="Java" highlight="">  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
<syntaxhighlight lang="Java" highlight="">  | # 生产者:  | ||
#: <syntaxhighlight lang="Java" highlight="">  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
# 消费者:  | |||
#: <syntaxhighlight lang="Java" highlight="">  | |||
<syntaxhighlight lang="Java" highlight="">  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
2021年5月25日 (二) 00:21的版本
关于
代码示例来自官网:https://www.rabbitmq.com/getstarted.html
Maven 依赖:
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.0</version>
        </dependency>
        ... ...
    </dependencies>
关键代码
- connection:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器 //创建与RabbitMQ服务的TCP连接 connection = factory.newConnection();
 - channel:
//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel();
- channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。
 
 - exchange、queue:
/** 声明交换机, * 参数明细: * 1、String exchange:交换机名称 * 2、BuiltinExchangeType type:交换机类型 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); /** 声明队列 * 参数明细: * 1、String queue:队列名称 * 2、boolean durable:是否持久化 * 3、boolean exclusive:是否独占此队列 * 4、boolean autoDelete:队列不用是否自动删除 * 5、Map<String, Object> arguments:参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** 绑定 * 参数明细 * 1、String queue:队列名称 * 2、String exchange:交换机名称 * 3、String routingKey:路由key */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);
- 根据不同的工作模式,不一定需要声明交换机、队列,或是绑定:
- 简单队列:
 - Work模式:
 - 发布订阅:
 - 路由模式:
 - 主题模式:
 
 
 - 消息发送:
for (int i=0;i<10;i++){ String message = "sms inform to user"+i; /** 向交换机发送消息 * 参数明细 * 1、String exchange:交换机名称 * 2、String routingKey:路由key * 3、BasicProperties props:消息属性 * 4、byte[] body:消息内容 */ channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); }
- “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange;
 
 - 消息接收:
/** * 定义消费方法,写法一: * * DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { * @Override * public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { * long deliveryTag = envelope.getDeliveryTag(); * String exchange = envelope.getExchange(); * // 消息内容 * String message = new String(body, "utf-8"); * System.out.println(message); * } * }; */ /** * 定义消费方法,写法二:(Lambda表达式,手动确认消息) * * DeliverCallback deliverCallback = (consumerTag, delivery) -> { * String message = new String(delivery.getBody(), "UTF-8"); * * try { * // 调用消息处理 * doWork(message); * } catch (InterruptedException e) { * e.getMessage(); * } finally { * System.out.println(" [x] 接收 '" + message + "'"); * // 手动确认消息 * channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); * } * }; // 定义消费方法,写法三: DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); }; /** * 监听队列, , * 参数明细 * 1、String queue:队列名称 * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复 * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL, true, deliverCallback);
- 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行)
 
 
Hello World
- 生产者:
package top.codexu.rabbitmq.hello; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] 发送 '" + message + "'"); } } }
- 可以使用 try-with-resource 语句,不必再显式地关闭资源;(Connection 和 Channel 都实现 java.io.Closeable)
 
 - 消费者:
package top.codexu.rabbitmq.hello; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] 等待消息."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
- 在这里也声明了队列:因为我们可能会在发布服务器之前启动使用者,所以我们希望在尝试使用来自队列的消息之前确保该队列存在。
 - 不使用 try-with-resource 语句自动关闭通道和连接:因为我们希望在消费者异步侦听消息到达时,该过程保持活动状态。
 
 
Work模式
工作队列是啥? 就是当遇到了运行耗时久的任务,并且还得等待它完成,这个时候就可以使用工作队列,把这个耗时任务发送给别的工人(消费者)进行处理,生产者可以直接得到处理完的情况。
相关特性:
- 消息确认:消费者发送通知告诉 MQ 处理完成了;
 - 消息持久:遇到了宕机后,会把消息给缓存或保存下来,使得下次启动能够不丢失(但不是百分百);
 - 循环调度:若开着两个或三个消费者的时候,当多个消息要接收,MQ 是会自动循环找下一个,避免一直重复同一个或几个;
 - 公平派遣:当有两个消费者的时候,若一个消费者一直再累死累活,另外一个逍遥自在,这是不利于效率提升的,故可以通过设置,限制若A忙就找B去。
 
- 生产者:
package top.codexu.rabbitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.nio.charset.StandardCharsets; public class NewTask { private final static String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 消息持久化(非百分百保证)。前提:队列第一次声明就是持久的,并且在生产者端和消费者端都要声明为true。 boolean durable = true; channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null); for (int i = 0; i <= 5; i++) { String message = "第" + i + "个消息."; // MessageProperties.PERSISTENT_TEXT_PLAIN:将消息标记为持久性,确保即使 RabbitMQ 重新启动,task_queue 队列也不会丢失。 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] 发送 '" + message + "'"); } } } }
- 将消息标记为持久性并不能完全保证消息不会丢失:
- 尽管它告诉 RabbitMQ 将消息保存到磁盘,但 RabbitMQ 接受消息但尚未保存消息的时间窗口仍然很短。而且,RabbitMQ 并不是对每条消息都执行“fsync(2)”:它可能只是保存到缓存中,而不是真正写入磁盘。
 - 持久性保证不强,但对于简单任务队列来说已经足够了。如果你需要一个更强有力的保证,那么你可以使用“publisher confirms”。
 
 
 - 消费者:
package top.codexu.rabbitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private final static String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); // 消息持久化(非百分百保证)。消费者端同发布者一样设置为 true boolean durable = true; channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null); System.out.println(" [*] 等待消息."); // 公平派遣设置:一次仅接受一条未经确认的消息,为了实现公平的安排(其他人闲着就安排其他人) channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); try { doWork(message); } catch (InterruptedException e) { e.getMessage(); } finally { System.out.println(" [x] 结束"); // 执行完成,返回确认消息。 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; // 消息确认:目的就是为了不丢失消息,使得确保每个消息有执行成功。(网络抖动也不怕,会找别人执行) boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } private static void doWork(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } }
 
RabbitMQ 查看当前队列有那些消息还未确定:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
work    0       3
hello   0       0
(messages_ready:等待接收的消息;messages_unacknowledged:已接受但未确认的消息)
发布订阅
fanout 交换非常简单,它只是将接收到的所有消息广播到它知道的所有队列。
不同于以上使用具有特定名称的队列(在生产者和消费者之间共享队列时,为队列指定名称非常重要),发布订阅模式中,我们希望听到所有日志消息,而不仅仅是其中的一个子集。我们也只对当前的流消息感兴趣,而不是对旧消息感兴趣。要解决这个问题,我们就需要临时队列:
- 一个新的、空的,随机名称的队列;
 - 一旦断开连接,队列就被自动删除。
 
在 Java客户机中,当我们不向“queueDeclare()”提供任何参数时,将创建一个非持久的、独占的、自动删除的队列,并使用生成的名称:
String queueName = channel.queueDeclare().getQueue();   // queueName包含一个随机队列名称,如:amq.gen-JzTY20BRgKO-hjmuj0wlg
- 生产者:
package top.codexu.rabbitmq.publicsubscribe; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = "info: Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] 发送 '" + message + "'"); } } }
 - 消费者:
package top.codexu.rabbitmq.publicsubscribe; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] 等待消息."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
 
RabbitMQ 查看绑定关系:
rabbitmqctl list_bindings
路由模式
- 生产者:
package top.codexu.rabbitmq.routing; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String severity = "warning"; String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); } } }
 - 消费者:
package top.codexu.rabbitmq.routing; import com.rabbitmq.client.*; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName1 = channel.queueDeclare().getQueue(); String queueName2 = channel.queueDeclare().getQueue(); String[] severities; severities = {"info", "warning", "error"}; for (String severity : severities1) { channel.queueBind(queueName1, EXCHANGE_NAME, severity); } severities = {"info"}; for (String severity : severities2) { channel.queueBind(queueName2, EXCHANGE_NAME, severity); } System.out.println(" [*] 等待消息."); DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 队列1接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { }); DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 队列2接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { }); } }
- 如上,一个 queue 可以绑定多个 routingKey。
 
 
主题模式
- 生产者:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String[] routingKeys = new String[]{"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit"}; for(String severity : routingKeys){ String message = "From " + severity + " routingKey' s message!"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } } //.. }
 - 消费者:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName1 = channel.queueDeclare().getQueue(); String queueName2 = channel.queueDeclare().getQueue(); String[] routingKeys; routingKeys = new String[]{"*.orange.*"}; for (String bindingKey : routingKeys) { channel.queueBind(queueName1, EXCHANGE_NAME, bindingKey); System.out.println("ReceiveLogsTopic2 exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + bindingKey); } routingKeys = new String[]{"*.*.rabbit", "lazy.#"}; for (String bindingKey : routingKeys) { channel.queueBind(queueName2, EXCHANGE_NAME, bindingKey); System.out.println("ReceiveLogsTopic2 exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + bindingKey); } DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { }); DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { }); } }
 
标题文字
- 生产者:
 - 消费者:
 
- 生产者:
 - 消费者:
 
- 生产者:
 - 消费者: