<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="zh-Hans-CN">
	<id>http://wiki.eijux.com/index.php?action=history&amp;feed=atom&amp;title=Kafka%EF%BC%9A%E4%B8%8ESpringBoot%E9%9B%86%E6%88%90</id>
	<title>Kafka：与SpringBoot集成 - 版本历史</title>
	<link rel="self" type="application/atom+xml" href="http://wiki.eijux.com/index.php?action=history&amp;feed=atom&amp;title=Kafka%EF%BC%9A%E4%B8%8ESpringBoot%E9%9B%86%E6%88%90"/>
	<link rel="alternate" type="text/html" href="http://wiki.eijux.com/index.php?title=Kafka%EF%BC%9A%E4%B8%8ESpringBoot%E9%9B%86%E6%88%90&amp;action=history"/>
	<updated>2026-04-29T08:56:19Z</updated>
	<subtitle>本wiki上该页面的版本历史</subtitle>
	<generator>MediaWiki 1.38.2</generator>
	<entry>
		<id>http://wiki.eijux.com/index.php?title=Kafka%EF%BC%9A%E4%B8%8ESpringBoot%E9%9B%86%E6%88%90&amp;diff=3597&amp;oldid=prev</id>
		<title>Eijux：​建立内容为“category:Kafka  == 关于 == 对于使用Apache Kafka的Spring项目，我们在Spring核心提供了Kafka消息的集成。提供了公共的接入“模板…”的新页面</title>
		<link rel="alternate" type="text/html" href="http://wiki.eijux.com/index.php?title=Kafka%EF%BC%9A%E4%B8%8ESpringBoot%E9%9B%86%E6%88%90&amp;diff=3597&amp;oldid=prev"/>
		<updated>2021-05-19T17:34:08Z</updated>

		<summary type="html">&lt;p&gt;建立内容为“&lt;a href=&quot;/%E5%88%86%E7%B1%BB:Kafka&quot; title=&quot;分类:Kafka&quot;&gt;category:Kafka&lt;/a&gt;  == 关于 == 对于使用Apache Kafka的Spring项目，我们在Spring核心提供了Kafka消息的集成。提供了公共的接入“模板…”的新页面&lt;/p&gt;
&lt;p&gt;&lt;b&gt;新页面&lt;/b&gt;&lt;/p&gt;&lt;div&gt;[[category:Kafka]]&lt;br /&gt;
&lt;br /&gt;
== 关于 ==&lt;br /&gt;
对于使用Apache Kafka的Spring项目，我们在Spring核心提供了Kafka消息的集成。提供了公共的接入“模板”，作为消息发送的高级抽象层，还为消息的POJO提供支持。&lt;br /&gt;
&lt;br /&gt;
=== 依赖 ===&lt;br /&gt;
您的Apache Kafka已经安装并且运行了。然后，您必须有spring-kafka JAR及其所有依赖项。 &lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
最简单的方法是在构建工具中声明一个依赖项：&lt;br /&gt;
# Maven：&lt;br /&gt;
#: &amp;lt;syntaxhighlight lang=&amp;quot;xml&amp;quot; highlight=&amp;quot;&amp;quot;&amp;gt;&lt;br /&gt;
&amp;lt;dependency&amp;gt;&lt;br /&gt;
   &amp;lt;groupId&amp;gt;org.springframework.kafka&amp;lt;/groupId&amp;gt;&lt;br /&gt;
   &amp;lt;artifactId&amp;gt;spring-kafka&amp;lt;/artifactId&amp;gt;&lt;br /&gt;
   &amp;lt;version&amp;gt;2.4.1.RELEASE&amp;lt;/version&amp;gt;&lt;br /&gt;
&amp;lt;/dependency&amp;gt;&lt;br /&gt;
&amp;lt;/syntaxhighlight&amp;gt;&lt;br /&gt;
# Gradle：&lt;br /&gt;
#: &amp;lt;syntaxhighlight lang=&amp;quot;xml&amp;quot; highlight=&amp;quot;&amp;quot;&amp;gt;&lt;br /&gt;
compile 'org.springframework.kafka:spring-kafka:2.4.1.RELEASE'&lt;br /&gt;
&amp;lt;/syntaxhighlight&amp;gt;&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
使用Spring Boot时，如果忽略该版本，则Spring Boot将自动引入与您的Boot版本兼容的正确版本：&lt;br /&gt;
# Maven：&lt;br /&gt;
#: &amp;lt;syntaxhighlight lang=&amp;quot;xml&amp;quot; highlight=&amp;quot;&amp;quot;&amp;gt;&lt;br /&gt;
&amp;lt;dependency&amp;gt;&lt;br /&gt;
   &amp;lt;groupId&amp;gt;org.springframework.kafka&amp;lt;/groupId&amp;gt;&lt;br /&gt;
   &amp;lt;artifactId&amp;gt;spring-kafka&amp;lt;/artifactId&amp;gt;&lt;br /&gt;
&amp;lt;/dependency&amp;gt;&lt;br /&gt;
&amp;lt;/syntaxhighlight&amp;gt;&lt;br /&gt;
# Gradle：&lt;br /&gt;
#: &amp;lt;syntaxhighlight lang=&amp;quot;xml&amp;quot; highlight=&amp;quot;&amp;quot;&amp;gt;&lt;br /&gt;
compile 'org.springframework.kafka:spring-kafka'&lt;br /&gt;
&amp;lt;/syntaxhighlight&amp;gt;&lt;br /&gt;
&lt;br /&gt;
=== 兼容性 ===&lt;br /&gt;
适用于以下的版本：&lt;br /&gt;
* Apache Kafka Clients 2.2.0&lt;br /&gt;
* Spring Framework 5.2.x&lt;br /&gt;
* 最小的 Java 版本: 8&lt;br /&gt;
&lt;br /&gt;
== 一个非常非常快速的例子【？？？】 ==&lt;br /&gt;
如下例所示，您可以使用普通Java发送和接收消息：&lt;br /&gt;
&amp;lt;syntaxhighlight lang=&amp;quot;java&amp;quot; highlight=&amp;quot;&amp;quot;&amp;gt;&lt;br /&gt;
package com.example.kafka;&lt;br /&gt;
&lt;br /&gt;
import org.apache.kafka.clients.consumer.ConsumerConfig;&lt;br /&gt;
import org.apache.kafka.clients.consumer.ConsumerRecord;&lt;br /&gt;
import org.apache.kafka.clients.producer.ProducerConfig;&lt;br /&gt;
import org.apache.kafka.common.serialization.IntegerDeserializer;&lt;br /&gt;
import org.apache.kafka.common.serialization.IntegerSerializer;&lt;br /&gt;
import org.apache.kafka.common.serialization.StringDeserializer;&lt;br /&gt;
import org.apache.kafka.common.serialization.StringSerializer;&lt;br /&gt;
import org.junit.jupiter.api.Test;&lt;br /&gt;
import org.slf4j.Logger;&lt;br /&gt;
import org.slf4j.LoggerFactory;&lt;br /&gt;
import org.springframework.boot.test.context.SpringBootTest;&lt;br /&gt;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;&lt;br /&gt;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;&lt;br /&gt;
import org.springframework.kafka.core.KafkaTemplate;&lt;br /&gt;
import org.springframework.kafka.core.ProducerFactory;&lt;br /&gt;
import org.springframework.kafka.listener.ContainerProperties;&lt;br /&gt;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;&lt;br /&gt;
import org.springframework.kafka.listener.MessageListener;&lt;br /&gt;
&lt;br /&gt;
import java.util.HashMap;&lt;br /&gt;
import java.util.Map;&lt;br /&gt;
import java.util.concurrent.CountDownLatch;&lt;br /&gt;
import java.util.concurrent.TimeUnit;&lt;br /&gt;
&lt;br /&gt;
import static org.junit.jupiter.api.Assertions.assertTrue;&lt;br /&gt;
&lt;br /&gt;
@SpringBootTest&lt;br /&gt;
class KafkaTests01 {&lt;br /&gt;
&lt;br /&gt;
    private Logger logger = LoggerFactory.getLogger(getClass());&lt;br /&gt;
    private String group = &amp;quot;group01&amp;quot;;&lt;br /&gt;
    private String topic1 = &amp;quot;topic1&amp;quot;;&lt;br /&gt;
&lt;br /&gt;
    @Test&lt;br /&gt;
    public void testAutoCommit() throws Exception {&lt;br /&gt;
        logger.info(&amp;quot;Start auto&amp;quot;);&lt;br /&gt;
&lt;br /&gt;
        // 启动消费者&lt;br /&gt;
        ContainerProperties containerProps = new ContainerProperties(&amp;quot;topic1&amp;quot;, &amp;quot;topic2&amp;quot;);&lt;br /&gt;
        final CountDownLatch latch = new CountDownLatch(4);&lt;br /&gt;
        containerProps.setMessageListener(new MessageListener&amp;lt;Integer, String&amp;gt;() {&lt;br /&gt;
            @Override&lt;br /&gt;
            public void onMessage(ConsumerRecord&amp;lt;Integer, String&amp;gt; message) {&lt;br /&gt;
                logger.info(&amp;quot;received: &amp;quot; + message);&lt;br /&gt;
                latch.countDown();&lt;br /&gt;
            }&lt;br /&gt;
        });&lt;br /&gt;
        KafkaMessageListenerContainer&amp;lt;Integer, String&amp;gt; container = createContainer(containerProps);&lt;br /&gt;
        container.setBeanName(&amp;quot;testAuto&amp;quot;);&lt;br /&gt;
        container.start();  // 启动消费者&lt;br /&gt;
&lt;br /&gt;
        Thread.sleep(1000); // wait a bit for the container to start&lt;br /&gt;
&lt;br /&gt;
        // 启动生产者&lt;br /&gt;
        KafkaTemplate&amp;lt;Integer, String&amp;gt; template = createTemplate();&lt;br /&gt;
        template.setDefaultTopic(topic1);&lt;br /&gt;
        template.sendDefault(0, &amp;quot;foo&amp;quot;);&lt;br /&gt;
        template.sendDefault(2, &amp;quot;bar&amp;quot;);&lt;br /&gt;
        template.sendDefault(0, &amp;quot;baz&amp;quot;);&lt;br /&gt;
        template.sendDefault(2, &amp;quot;qux&amp;quot;);&lt;br /&gt;
        template.flush();&lt;br /&gt;
&lt;br /&gt;
        assertTrue(latch.await(60, TimeUnit.SECONDS));&lt;br /&gt;
        container.stop(); // 关闭消费者&lt;br /&gt;
        logger.info(&amp;quot;Stop auto&amp;quot;);&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
    private KafkaMessageListenerContainer&amp;lt;Integer, String&amp;gt; createContainer(ContainerProperties containerProps) {&lt;br /&gt;
        Map&amp;lt;String, Object&amp;gt; props = consumerProps();&lt;br /&gt;
        DefaultKafkaConsumerFactory&amp;lt;Integer, String&amp;gt; cf = new DefaultKafkaConsumerFactory&amp;lt;Integer, String&amp;gt;(props);&lt;br /&gt;
        KafkaMessageListenerContainer&amp;lt;Integer, String&amp;gt; container = new KafkaMessageListenerContainer&amp;lt;&amp;gt;(cf, containerProps);&lt;br /&gt;
        return container;&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
    private KafkaTemplate&amp;lt;Integer, String&amp;gt; createTemplate() {&lt;br /&gt;
        Map&amp;lt;String, Object&amp;gt; senderProps = senderProps();&lt;br /&gt;
        ProducerFactory&amp;lt;Integer, String&amp;gt; pf = new DefaultKafkaProducerFactory&amp;lt;Integer, String&amp;gt;(senderProps);&lt;br /&gt;
        KafkaTemplate&amp;lt;Integer, String&amp;gt; template = new KafkaTemplate&amp;lt;&amp;gt;(pf);&lt;br /&gt;
        return template;&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
    private Map&amp;lt;String, Object&amp;gt; consumerProps() {&lt;br /&gt;
        Map&amp;lt;String, Object&amp;gt; props = new HashMap&amp;lt;&amp;gt;();&lt;br /&gt;
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, &amp;quot;localhost:9092&amp;quot;);&lt;br /&gt;
        props.put(ConsumerConfig.GROUP_ID_CONFIG, group);&lt;br /&gt;
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);&lt;br /&gt;
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, &amp;quot;100&amp;quot;);&lt;br /&gt;
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, &amp;quot;15000&amp;quot;);&lt;br /&gt;
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);&lt;br /&gt;
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);&lt;br /&gt;
        return props;&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
    private Map&amp;lt;String, Object&amp;gt; senderProps() {&lt;br /&gt;
        Map&amp;lt;String, Object&amp;gt; props = new HashMap&amp;lt;&amp;gt;();&lt;br /&gt;
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, &amp;quot;localhost:9092&amp;quot;);&lt;br /&gt;
        props.put(ProducerConfig.RETRIES_CONFIG, 0);&lt;br /&gt;
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);&lt;br /&gt;
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);&lt;br /&gt;
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);&lt;br /&gt;
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);&lt;br /&gt;
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);&lt;br /&gt;
        return props;&lt;br /&gt;
    }&lt;br /&gt;
}&lt;br /&gt;
&amp;lt;/syntaxhighlight&amp;gt;&lt;br /&gt;
&lt;br /&gt;
== 使用Java配置【？？？】 ==&lt;br /&gt;
也可以使用Java的Spring配置来完成与上一个示例中相同的效果。&lt;br /&gt;
&amp;lt;syntaxhighlight lang=&amp;quot;java&amp;quot; highlight=&amp;quot;&amp;quot;&amp;gt;&lt;br /&gt;
@Autowired&lt;br /&gt;
private Listener listener;&lt;br /&gt;
&lt;br /&gt;
@Autowired&lt;br /&gt;
private KafkaTemplate&amp;lt;Integer, String&amp;gt; template;&lt;br /&gt;
&lt;br /&gt;
@Test&lt;br /&gt;
public void testSimple() throws Exception {&lt;br /&gt;
    template.send(&amp;quot;annotated1&amp;quot;, 0, &amp;quot;foo&amp;quot;);&lt;br /&gt;
    template.flush();&lt;br /&gt;
    assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));&lt;br /&gt;
}&lt;br /&gt;
&lt;br /&gt;
@Configuration&lt;br /&gt;
@EnableKafka&lt;br /&gt;
public class Config {&lt;br /&gt;
&lt;br /&gt;
    @Bean&lt;br /&gt;
    ConcurrentKafkaListenerContainerFactory&amp;lt;Integer, String&amp;gt;&lt;br /&gt;
                        kafkaListenerContainerFactory() {&lt;br /&gt;
        ConcurrentKafkaListenerContainerFactory&amp;lt;Integer, String&amp;gt; factory =&lt;br /&gt;
                                new ConcurrentKafkaListenerContainerFactory&amp;lt;&amp;gt;();&lt;br /&gt;
        factory.setConsumerFactory(consumerFactory());&lt;br /&gt;
        return factory;&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
    @Bean&lt;br /&gt;
    public ConsumerFactory&amp;lt;Integer, String&amp;gt; consumerFactory() {&lt;br /&gt;
        return new DefaultKafkaConsumerFactory&amp;lt;&amp;gt;(consumerConfigs());&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
    @Bean&lt;br /&gt;
    public Map&amp;lt;String, Object&amp;gt; consumerConfigs() {&lt;br /&gt;
        Map&amp;lt;String, Object&amp;gt; props = new HashMap&amp;lt;&amp;gt;();&lt;br /&gt;
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());&lt;br /&gt;
        ...&lt;br /&gt;
        return props;&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
    @Bean&lt;br /&gt;
    public Listener listener() {&lt;br /&gt;
        return new Listener();&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
    @Bean&lt;br /&gt;
    public ProducerFactory&amp;lt;Integer, String&amp;gt; producerFactory() {&lt;br /&gt;
        return new DefaultKafkaProducerFactory&amp;lt;&amp;gt;(producerConfigs());&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
    @Bean&lt;br /&gt;
    public Map&amp;lt;String, Object&amp;gt; producerConfigs() {&lt;br /&gt;
        Map&amp;lt;String, Object&amp;gt; props = new HashMap&amp;lt;&amp;gt;();&lt;br /&gt;
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());&lt;br /&gt;
        ...&lt;br /&gt;
        return props;&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
    @Bean&lt;br /&gt;
    public KafkaTemplate&amp;lt;Integer, String&amp;gt; kafkaTemplate() {&lt;br /&gt;
        return new KafkaTemplate&amp;lt;Integer, String&amp;gt;(producerFactory());&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
}&lt;br /&gt;
&amp;lt;/syntaxhighlight&amp;gt;&lt;br /&gt;
&amp;lt;syntaxhighlight lang=&amp;quot;java&amp;quot; highlight=&amp;quot;&amp;quot;&amp;gt;&lt;br /&gt;
public class Listener {&lt;br /&gt;
&lt;br /&gt;
private final CountDownLatch latch1 = new CountDownLatch(1);&lt;br /&gt;
&lt;br /&gt;
    @KafkaListener(id = &amp;quot;foo&amp;quot;, topics = &amp;quot;annotated1&amp;quot;)&lt;br /&gt;
    public void listen1(String foo) {&lt;br /&gt;
        this.latch1.countDown();&lt;br /&gt;
    }&lt;br /&gt;
}&lt;br /&gt;
&amp;lt;/syntaxhighlight&amp;gt;&lt;br /&gt;
&lt;br /&gt;
== Spring Boot更简单的方式【？？？】 ==&lt;br /&gt;
Spring Boot可以更加简单。 &lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
下面的Spring Boot应用示例将三个消息发送到一个主题，然后接收它们，然后停止：&lt;br /&gt;
&amp;lt;syntaxhighlight lang=&amp;quot;java&amp;quot; highlight=&amp;quot;&amp;quot;&amp;gt;&lt;br /&gt;
package com.example.kafka.demo03;&lt;br /&gt;
&lt;br /&gt;
import org.apache.kafka.clients.consumer.ConsumerRecord;&lt;br /&gt;
import org.slf4j.Logger;&lt;br /&gt;
import org.slf4j.LoggerFactory;&lt;br /&gt;
import org.springframework.beans.factory.annotation.Autowired;&lt;br /&gt;
import org.springframework.boot.CommandLineRunner;&lt;br /&gt;
import org.springframework.boot.SpringApplication;&lt;br /&gt;
import org.springframework.boot.autoconfigure.SpringBootApplication;&lt;br /&gt;
import org.springframework.kafka.annotation.KafkaListener;&lt;br /&gt;
import org.springframework.kafka.core.KafkaTemplate;&lt;br /&gt;
&lt;br /&gt;
import java.util.concurrent.CountDownLatch;&lt;br /&gt;
import java.util.concurrent.TimeUnit;&lt;br /&gt;
&lt;br /&gt;
@SpringBootApplication&lt;br /&gt;
public class Application implements CommandLineRunner {&lt;br /&gt;
&lt;br /&gt;
    public static Logger logger = LoggerFactory.getLogger(Application.class);&lt;br /&gt;
&lt;br /&gt;
    public static void main(String[] args) {&lt;br /&gt;
        SpringApplication.run(Application.class, args).close();&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
    @Autowired&lt;br /&gt;
    private KafkaTemplate&amp;lt;String, String&amp;gt; template;&lt;br /&gt;
&lt;br /&gt;
    private final CountDownLatch latch = new CountDownLatch(3);&lt;br /&gt;
&lt;br /&gt;
    @Override&lt;br /&gt;
    public void run(String... args) throws Exception {&lt;br /&gt;
        this.template.send(&amp;quot;myTopic&amp;quot;, &amp;quot;foo1&amp;quot;);&lt;br /&gt;
        this.template.send(&amp;quot;myTopic&amp;quot;, &amp;quot;foo2&amp;quot;);&lt;br /&gt;
        this.template.send(&amp;quot;myTopic&amp;quot;, &amp;quot;foo3&amp;quot;);&lt;br /&gt;
        latch.await(60, TimeUnit.SECONDS);&lt;br /&gt;
        logger.info(&amp;quot;All received&amp;quot;);&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
    @KafkaListener(topics = &amp;quot;myTopic&amp;quot;)&lt;br /&gt;
    public void listen(ConsumerRecord&amp;lt;?, ?&amp;gt; cr) throws Exception {&lt;br /&gt;
        logger.info(cr.toString());&lt;br /&gt;
        latch.countDown();&lt;br /&gt;
    }&lt;br /&gt;
}&lt;br /&gt;
&amp;lt;/syntaxhighlight&amp;gt;&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
配置“application.properties”：&lt;br /&gt;
&amp;lt;syntaxhighlight lang=&amp;quot;java&amp;quot; highlight=&amp;quot;&amp;quot;&amp;gt;&lt;br /&gt;
spring.kafka.consumer.group-id=foo&lt;br /&gt;
spring.kafka.consumer.auto-offset-reset=earliest&lt;br /&gt;
spring.kafka.listener.missing-topics-fatal=false&lt;br /&gt;
&amp;lt;/syntaxhighlight&amp;gt;&lt;br /&gt;
其中：&lt;br /&gt;
* spring.kafka.consumer.group-id：指定消费者组id。&lt;br /&gt;
* spring.kafka.consumer.auto-offset-reset：确保新的消费者组能获得我们之前发送的消息，为了测试方便（生产配置latest，只获取最新的消息）。&lt;br /&gt;
* spring.kafka.listener.missing-topics-fatal：监听的topic如果不存在，则不报错&lt;/div&gt;</summary>
		<author><name>Eijux</name></author>
	</entry>
</feed>