查看“RabbitMQ:Java(关键代码)”的源代码
←
RabbitMQ:Java(关键代码)
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:RabbitMQ]] [[category:Java]] __TOC__ == 关于 == 参考:'''[https://www.rabbitmq.com/getstarted.html RabbitMQ Tutorials]''' Maven 依赖: : <syntaxhighlight lang="xml" highlight=""> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.0</version> </dependency> ... ... </dependencies> </syntaxhighlight> == 关键代码 == === connection === : <syntaxhighlight lang="Java" highlight=""> ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器 //创建与RabbitMQ服务的TCP连接 connection = factory.newConnection(); </syntaxhighlight> === channel === : <syntaxhighlight lang="Java" highlight=""> //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel(); </syntaxhighlight> * channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。 === exchange'''、'''queue'''、'''binding === : <syntaxhighlight lang="Java" highlight=""> /** 声明交换机, * 参数明细: * 1、String exchange:交换机名称 * 2、BuiltinExchangeType type:交换机类型 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); /** 声明队列 * 参数明细: * 1、String queue:队列名称 * 2、boolean durable:是否持久化 * 3、boolean exclusive:是否独占此队列 * 4、boolean autoDelete:队列不用是否自动删除 * 5、Map<String, Object> arguments:参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** 绑定 * 参数明细 * 1、String queue:队列名称 * 2、String exchange:交换机名称 * 3、String routingKey:路由key */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS); /** * 临时队列 */ String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_ROUTING_INFORM, ""); </syntaxhighlight> === 消息发送 === : <syntaxhighlight lang="Java" highlight=""> for (int i=0;i<10;i++){ String message = "sms inform to user"+i; /** 向交换机发送消息 * 参数明细 * 1、String exchange:交换机名称 * 2、String routingKey:路由key * 3、BasicProperties props:消息属性 * 4、byte[] body:消息内容 */ channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } </syntaxhighlight> * “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange; === 消息接收 === : <syntaxhighlight lang="Java" highlight=""> /** * 定义消费方法 */ DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); /** * try { * // 调用消息处理 * doWork(message); * } catch (InterruptedException e) { * e.getMessage(); * } finally { * System.out.println(" [x] 结束"); * // 手动确认消息 * // DeliveryTag:唯一地标识通道上的传递 * // false:是否批量确认 * channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); * } */ }; /** * 监听队列 * 参数明细 * 1、String queue:队列名称 * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复 * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法 * 4、 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); </syntaxhighlight> : <syntaxhighlight lang="Java" highlight=""> /** * 定义消费方法 */ DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); String exchange = envelope.getExchange(); // 消息内容 String message = new String(body, "utf-8"); System.out.println(message); } }; /** * 监听队列 * 参数明细 * 1、String queue:队列名称 * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复 * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer); </syntaxhighlight> * 使用额外的 '''DeliverCallback''' 接口来缓冲服务器推送给我们的消息。 * 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行)
返回至“
RabbitMQ:Java(关键代码)
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息