RabbitMQ:Java(关键代码)
跳到导航
跳到搜索
关于
参考:RabbitMQ Tutorials
Maven 依赖:
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.0</version> </dependency> ... ... </dependencies>
关键代码
connection
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器 //创建与RabbitMQ服务的TCP连接 connection = factory.newConnection();
channel
//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel();
- channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。
exchange、queue、binding
/** 声明交换机, * 参数明细: * 1、String exchange:交换机名称 * 2、BuiltinExchangeType type:交换机类型 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); /** 声明队列 * 参数明细: * 1、String queue:队列名称 * 2、boolean durable:是否持久化 * 3、boolean exclusive:是否独占此队列 * 4、boolean autoDelete:队列不用是否自动删除 * 5、Map<String, Object> arguments:参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** 绑定 * 参数明细 * 1、String queue:队列名称 * 2、String exchange:交换机名称 * 3、String routingKey:路由key */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS); /** * 临时队列 */ String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_ROUTING_INFORM, "");
消息发送
for (int i=0;i<10;i++){ String message = "sms inform to user"+i; /** 向交换机发送消息 * 参数明细 * 1、String exchange:交换机名称 * 2、String routingKey:路由key * 3、BasicProperties props:消息属性 * 4、byte[] body:消息内容 */ channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); }
- “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange;
消息接收
/** * 定义消费方法 */ DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); /** * try { * // 调用消息处理 * doWork(message); * } catch (InterruptedException e) { * e.getMessage(); * } finally { * System.out.println(" [x] 结束"); * // 手动确认消息 * // DeliveryTag:唯一地标识通道上的传递 * // false:是否批量确认 * channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); * } */ }; /** * 监听队列 * 参数明细 * 1、String queue:队列名称 * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复 * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法 * 4、 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
/** * 定义消费方法 */ DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); String exchange = envelope.getExchange(); // 消息内容 String message = new String(body, "utf-8"); System.out.println(message); } }; /** * 监听队列 * 参数明细 * 1、String queue:队列名称 * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复 * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
- 使用额外的 DeliverCallback 接口来缓冲服务器推送给我们的消息。
- 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行)