查看“Kafka:与SpringBoot集成”的源代码
←
Kafka:与SpringBoot集成
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:Kafka]] == 关于 == 对于使用Apache Kafka的Spring项目,我们在Spring核心提供了Kafka消息的集成。提供了公共的接入“模板”,作为消息发送的高级抽象层,还为消息的POJO提供支持。 === 依赖 === 您的Apache Kafka已经安装并且运行了。然后,您必须有spring-kafka JAR及其所有依赖项。 最简单的方法是在构建工具中声明一个依赖项: # Maven: #: <syntaxhighlight lang="xml" highlight=""> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.4.1.RELEASE</version> </dependency> </syntaxhighlight> # Gradle: #: <syntaxhighlight lang="xml" highlight=""> compile 'org.springframework.kafka:spring-kafka:2.4.1.RELEASE' </syntaxhighlight> 使用Spring Boot时,如果忽略该版本,则Spring Boot将自动引入与您的Boot版本兼容的正确版本: # Maven: #: <syntaxhighlight lang="xml" highlight=""> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </syntaxhighlight> # Gradle: #: <syntaxhighlight lang="xml" highlight=""> compile 'org.springframework.kafka:spring-kafka' </syntaxhighlight> === 兼容性 === 适用于以下的版本: * Apache Kafka Clients 2.2.0 * Spring Framework 5.2.x * 最小的 Java 版本: 8 == 一个非常非常快速的例子【???】 == 如下例所示,您可以使用普通Java发送和接收消息: <syntaxhighlight lang="java" highlight=""> package com.example.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.MessageListener; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertTrue; @SpringBootTest class KafkaTests01 { private Logger logger = LoggerFactory.getLogger(getClass()); private String group = "group01"; private String topic1 = "topic1"; @Test public void testAutoCommit() throws Exception { logger.info("Start auto"); // 启动消费者 ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); final CountDownLatch latch = new CountDownLatch(4); containerProps.setMessageListener(new MessageListener<Integer, String>() { @Override public void onMessage(ConsumerRecord<Integer, String> message) { logger.info("received: " + message); latch.countDown(); } }); KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps); container.setBeanName("testAuto"); container.start(); // 启动消费者 Thread.sleep(1000); // wait a bit for the container to start // 启动生产者 KafkaTemplate<Integer, String> template = createTemplate(); template.setDefaultTopic(topic1); template.sendDefault(0, "foo"); template.sendDefault(2, "bar"); template.sendDefault(0, "baz"); template.sendDefault(2, "qux"); template.flush(); assertTrue(latch.await(60, TimeUnit.SECONDS)); container.stop(); // 关闭消费者 logger.info("Stop auto"); } private KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProps) { Map<String, Object> props = consumerProps(); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container; } private KafkaTemplate<Integer, String> createTemplate() { Map<String, Object> senderProps = senderProps(); ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps); KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf); return template; } private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, group); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } private Map<String, Object> senderProps() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } } </syntaxhighlight> == 使用Java配置【???】 == 也可以使用Java的Spring配置来完成与上一个示例中相同的效果。 <syntaxhighlight lang="java" highlight=""> @Autowired private Listener listener; @Autowired private KafkaTemplate<Integer, String> template; @Test public void testSimple() throws Exception { template.send("annotated1", 0, "foo"); template.flush(); assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS)); } @Configuration @EnableKafka public class Config { @Bean ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; } @Bean public Listener listener() { return new Listener(); } @Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { return new KafkaTemplate<Integer, String>(producerFactory()); } } </syntaxhighlight> <syntaxhighlight lang="java" highlight=""> public class Listener { private final CountDownLatch latch1 = new CountDownLatch(1); @KafkaListener(id = "foo", topics = "annotated1") public void listen1(String foo) { this.latch1.countDown(); } } </syntaxhighlight> == Spring Boot更简单的方式【???】 == Spring Boot可以更加简单。 下面的Spring Boot应用示例将三个消息发送到一个主题,然后接收它们,然后停止: <syntaxhighlight lang="java" highlight=""> package com.example.kafka.demo03; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @SpringBootApplication public class Application implements CommandLineRunner { public static Logger logger = LoggerFactory.getLogger(Application.class); public static void main(String[] args) { SpringApplication.run(Application.class, args).close(); } @Autowired private KafkaTemplate<String, String> template; private final CountDownLatch latch = new CountDownLatch(3); @Override public void run(String... args) throws Exception { this.template.send("myTopic", "foo1"); this.template.send("myTopic", "foo2"); this.template.send("myTopic", "foo3"); latch.await(60, TimeUnit.SECONDS); logger.info("All received"); } @KafkaListener(topics = "myTopic") public void listen(ConsumerRecord<?, ?> cr) throws Exception { logger.info(cr.toString()); latch.countDown(); } } </syntaxhighlight> 配置“application.properties”: <syntaxhighlight lang="java" highlight=""> spring.kafka.consumer.group-id=foo spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.listener.missing-topics-fatal=false </syntaxhighlight> 其中: * spring.kafka.consumer.group-id:指定消费者组id。 * spring.kafka.consumer.auto-offset-reset:确保新的消费者组能获得我们之前发送的消息,为了测试方便(生产配置latest,只获取最新的消息)。 * spring.kafka.listener.missing-topics-fatal:监听的topic如果不存在,则不报错
返回至“
Kafka:与SpringBoot集成
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息