“RabbitMQ:Java(工作模式)”的版本间差异
		
		
		
		
		
		跳到导航
		跳到搜索
		
				
		
		
	
无编辑摘要  | 
				 (→关于)  | 
				||
| 第18行: | 第18行: | ||
</syntaxhighlight>  | </syntaxhighlight>  | ||
: [[File:RabbitMQ:Java示例.png|  | : [[File:RabbitMQ:Java示例.png|200px]]  | ||
== 关键代码 ==  | == 关键代码 ==  | ||
2021年5月24日 (一) 21:29的版本
关于
代码示例来自官网:https://www.rabbitmq.com/getstarted.html
Maven 依赖:
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.0</version>
        </dependency>
        ... ...
    </dependencies>
关键代码
- connection:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器 //创建与RabbitMQ服务的TCP连接 connection = factory.newConnection();
 - channel:
//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel();
- channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。
 
 - exchange、queue:
/** 声明交换机, * 参数明细: * 1、String exchange:交换机名称 * 2、BuiltinExchangeType type:交换机类型 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); /** 声明队列 * 参数明细: * 1、String queue:队列名称 * 2、boolean durable:是否持久化 * 3、boolean exclusive:是否独占此队列 * 4、boolean autoDelete:队列不用是否自动删除 * 5、Map<String, Object> arguments:参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** 绑定 * 参数明细 * 1、String queue:队列名称 * 2、String exchange:交换机名称 * 3、String routingKey:路由key */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);
- 根据不同的工作模式,不一定需要声明交换机、队列,或是绑定:
- 简单队列:
 - Work模式:
 - 发布订阅:
 - 路由模式:
 - 主题模式:
 
 
 - 消息发送:
for (int i=0;i<10;i++){ String message = "sms inform to user"+i; /** 向交换机发送消息 * 参数明细 * 1、String exchange:交换机名称 * 2、String routingKey:路由key * 3、BasicProperties props:消息属性 * 4、byte[] body:消息内容 */ channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); }
- “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange;
 
 - 消息接收:
/** * 定义消费方法,写法一: * * DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { * @Override * public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { * long deliveryTag = envelope.getDeliveryTag(); * String exchange = envelope.getExchange(); * // 消息内容 * String message = new String(body, "utf-8"); * System.out.println(message); * } * }; */ /** * 定义消费方法,写法二:(Lambda表达式,手动确认消息) * * DeliverCallback deliverCallback = (consumerTag, delivery) -> { * String message = new String(delivery.getBody(), "UTF-8"); * * try { * // 调用消息处理 * doWork(message); * } catch (InterruptedException e) { * e.getMessage(); * } finally { * System.out.println(" [x] 接收 '" + message + "'"); * // 手动确认消息 * channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); * } * }; // 定义消费方法,写法三: DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); }; /** * 监听队列, , * 参数明细 * 1、String queue:队列名称 * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复 * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL, true, deliverCallback);
- 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行)
 
 
Hello World