1. Preface
在基於Kafka的消息傳遞方案中,Spring for Apache Kafka project應用了Spring的經典核心概念。
我們以“template”來作為發送消息的高級抽象,而且我們還提供了Message-driven POJO支持。
2. What’s new?
2.1. What’s New in 2.6 Since 2.5
本節涵蓋了2.5到2.6所做的所有變更。有關先前版本的變更,請參考Change History.
2.1.1. Kafka Client Version
本版需要2.6.0 kafka-clients。
2.1.2. Listener Container Changes
默認的EOSMode當前是BETA。參考Exactly Once Semantics來了解詳細信息。
當前若恢復失敗,則錯誤處理器(error handler,繼承至FailedRecordProcessor)和DefaultAfterRollbackProcessor默認會重置BackOff。
當然,你也可以根據失敗記錄和異常來自由定製BackOff。
參考Seek To Current Container Error Handlers, Recovering Batch Error Handler, Publishing Dead-letter Records和After-rollback Processor來了解詳情。
除此之外,還可以在容器屬性中配置adviceChain。參考Listener Container Properties來了解詳情。
2.1.3. @KafkaLisener Changes
現在當使用手動分區分配時,您可以指定一個通配符來決定哪些分區需要重置為初始偏移。
此外,如果listener實現了ConsumerSeekAware,那麼手動分區後還會調用onPartitionsAssigned()方法(2.5.5版本加入的新功能)。參考Explicit Partition Assignment瞭解詳情。
為方便查找,AbstractConsumerSeekAware中還添加了許多便利方法。
參考Seeking to a Specific Offset來了解詳情。
3. Introduction
參考文檔的第一部分對Spring for Apache Kafka和底層概念進行高層概述,並提供了一些代碼片段來讓您快速上手。
3.1. Quick Tour for the Impatient
下面是Spring Kafka的五分鐘教程。
前提: 必須先安裝、運行Apache Kafka.然後,您必須獲得spring-kafka JAR及其所有依賴項。
最簡單的方法是在構建工具中聲明依賴。
下面是Maven示例:
org.springframework.kafka
spring-kafka
2.6.1
下面是如何在Gradle添加依賴的示例:
compile 'org.springframework.kafka:spring-kafka:2.6.1'
提示
使用Spring Boot時可省略version,因為它能自動匹配正確的spring-kafka版本:
org.springframework.kafka
spring-kafka
下面是Gradle示例:
compile 'org.springframework.kafka:spring-kafka'
3.1.1. Compatibility
該快速教程所需的最小版本如下:
Apache Kafka Clients 2.4.1
Spring Framework 5.3.x
Minimum Java version: 8
3.1.2. A Very, Very Quick Example
如下所示,你可以直接使用Java來收發消息:
@Test public void testAutoCommit() throws Exception {
logger.info("Start auto");
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setMessageListener(new MessageListener() {
@Override public void onMessage(ConsumerRecord message) {
logger.info("received: " + message);
latch.countDown();
}
});
KafkaMessageListenerContainer container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start(); Thread.sleep(1000); // wait a bit for the container to start
KafkaTemplate template = createTemplate();
template.setDefaultTopic("topic1");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertTrue(latch.await(60, TimeUnit.SECONDS));
container.stop();
logger.info("Stop auto");
}
private KafkaMessageListenerContainer
createContainer( ContainerProperties containerProps) {Map props = consumerProps();
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props);
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
}
private KafkaTemplate createTemplate() {
Map senderProps = senderProps();
ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps);
KafkaTemplate template = new KafkaTemplate<>(pf);
return template;
}
private Map consumerProps() {
Map props =
new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
private Map senderProps() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
3.1.3. With Java Configuration
您也可以使用Java配置來完成上述示例的等價功能。如下所示:
@Autowired
private Listener listener;
@Autowired
private KafkaTemplate template;
@Test
public void testSimple() throws Exception {
template.send("annotated1", 0, "foo");
template.flush();
assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));
}
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
... return props;
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map producerConfigs() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
... return props;
}
@Bean public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}
public class Listener {
private final CountDownLatch latch1 = new CountDownLatch(1);
@KafkaListener(id = "foo", topics = "annotated1")
public void listen1(String foo) {
this.latch1.countDown();
}
}
3.1.4. Even Quicker, with Spring Boot
Spring Boot可以讓事情更簡單。
下面的Spring Boot應用程序會向topic發送三條消息,然後再接收消息,最後再停止:
@SpringBootApplication
public class Application implements CommandLineRunner {
public static Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Autowired
private KafkaTemplate template;
private final CountDownLatch latch = new CountDownLatch(3);
@Override
public void run(String... args) throws Exception {
this.template.send("myTopic", "foo1");
this.template.send("myTopic", "foo2");
this.template.send("myTopic", "foo3");
latch.await(60, TimeUnit.SECONDS);
logger.info("All received");
}
@KafkaListener(topics = "myTopic")
public void
listen(ConsumerRecord, ?> cr) throws Exception {logger.info(cr.toString());
latch.countDown();
}
}
Boot能完成大多數配置。當使用local broker時,只需配置如下屬性:
Example 1. application.properties
spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=earliest
之所以需要第1個屬性,是因為我們需要根據組來為消費者分配topic分區。
第2個屬性可確保新的消費組能獲得先前發送的消息,因為容器可能會在發送完成後啟動。