spring-kafka-2.6.1~1. Preface


1. Preface

在基於Kafka的消息傳遞方案中,Spring for Apache Kafka project應用了Spring的經典核心概念。

我們以“template”來作為發送消息的高級抽象,而且我們還提供了Message-driven POJO支持。


2. What’s new?

2.1. What’s New in 2.6 Since 2.5

本節涵蓋了2.5到2.6所做的所有變更。有關先前版本的變更,請參考Change History.


2.1.1. Kafka Client Version

本版需要2.6.0 kafka-clients。


2.1.2. Listener Container Changes

默認的EOSMode當前是BETA。參考Exactly Once Semantics來了解詳細信息。

當前若恢復失敗,則錯誤處理器(error handler,繼承至FailedRecordProcessor)和DefaultAfterRollbackProcessor默認會重置BackOff。

當然,你也可以根據失敗記錄和異常來自由定製BackOff。

參考Seek To Current Container Error Handlers, Recovering Batch Error Handler, Publishing Dead-letter Records和After-rollback Processor來了解詳情。

除此之外,還可以在容器屬性中配置adviceChain。參考Listener Container Properties來了解詳情。


2.1.3. @KafkaLisener Changes

現在當使用手動分區分配時,您可以指定一個通配符來決定哪些分區需要重置為初始偏移。

此外,如果listener實現了ConsumerSeekAware,那麼手動分區後還會調用onPartitionsAssigned()方法(2.5.5版本加入的新功能)。參考Explicit Partition Assignment瞭解詳情。

為方便查找,AbstractConsumerSeekAware中還添加了許多便利方法。

參考Seeking to a Specific Offset來了解詳情。

3. Introduction

參考文檔的第一部分對Spring for Apache Kafka和底層概念進行高層概述,並提供了一些代碼片段來讓您快速上手。

3.1. Quick Tour for the Impatient

下面是Spring Kafka的五分鐘教程。

前提: 必須先安裝、運行Apache Kafka.然後,您必須獲得spring-kafka JAR及其所有依賴項。

最簡單的方法是在構建工具中聲明依賴。

下面是Maven示例:

org.springframework.kafka

spring-kafka

2.6.1

下面是如何在Gradle添加依賴的示例:

compile 'org.springframework.kafka:spring-kafka:2.6.1'

提示

使用Spring Boot時可省略version,因為它能自動匹配正確的spring-kafka版本:

org.springframework.kafka

spring-kafka

下面是Gradle示例:

compile 'org.springframework.kafka:spring-kafka'


3.1.1. Compatibility

該快速教程所需的最小版本如下:

Apache Kafka Clients 2.4.1

Spring Framework 5.3.x

Minimum Java version: 8

3.1.2. A Very, Very Quick Example

如下所示,你可以直接使用Java來收發消息:

@Test public void testAutoCommit() throws Exception {

logger.info("Start auto");

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");

final CountDownLatch latch = new CountDownLatch(4);

containerProps.setMessageListener(new MessageListener() {

@Override public void onMessage(ConsumerRecord message) {

logger.info("received: " + message);

latch.countDown();

}

});

KafkaMessageListenerContainer container = createContainer(containerProps);

container.setBeanName("testAuto");

container.start(); Thread.sleep(1000); // wait a bit for the container to start

KafkaTemplate template = createTemplate();

template.setDefaultTopic("topic1");

template.sendDefault(0, "foo");

template.sendDefault(2, "bar");

template.sendDefault(0, "baz");

template.sendDefault(2, "qux");

template.flush();

assertTrue(latch.await(60, TimeUnit.SECONDS));

container.stop();

logger.info("Stop auto");

}

private KafkaMessageListenerContainer

createContainer( ContainerProperties containerProps) {

Map props = consumerProps();

DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props);

KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps);

return container;

}

private KafkaTemplate createTemplate() {

Map senderProps = senderProps();

ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps);

KafkaTemplate template = new KafkaTemplate<>(pf);

return template;

}

private Map consumerProps() {

Map props =

new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(ConsumerConfig.GROUP_ID_CONFIG, group);

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

return props;

}

private Map senderProps() {

Map props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(ProducerConfig.RETRIES_CONFIG, 0);

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

props.put(ProducerConfig.LINGER_MS_CONFIG, 1);

props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return props;

}

3.1.3. With Java Configuration

您也可以使用Java配置來完成上述示例的等價功能。如下所示:

@Autowired

private Listener listener;

@Autowired

private KafkaTemplate template;

@Test

public void testSimple() throws Exception {

template.send("annotated1", 0, "foo");

template.flush();

assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));

}

@Configuration

@EnableKafka

public class Config {

@Bean

ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

return factory;

}

@Bean

public ConsumerFactory consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs());

}

@Bean

public Map consumerConfigs() {

Map props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());

... return props;

}

@Bean

public Listener listener() {

return new Listener();

}

@Bean

public ProducerFactory producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());

}

@Bean

public Map producerConfigs() {

Map props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());

... return props;

}

@Bean public KafkaTemplate kafkaTemplate() {

return new KafkaTemplate(producerFactory());

}

}

public class Listener {

private final CountDownLatch latch1 = new CountDownLatch(1);

@KafkaListener(id = "foo", topics = "annotated1")

public void listen1(String foo) {

this.latch1.countDown();

}

}

3.1.4. Even Quicker, with Spring Boot

Spring Boot可以讓事情更簡單。

下面的Spring Boot應用程序會向topic發送三條消息,然後再接收消息,最後再停止:

@SpringBootApplication

public class Application implements CommandLineRunner {

public static Logger logger = LoggerFactory.getLogger(Application.class);

public static void main(String[] args) {

SpringApplication.run(Application.class, args).close();

}

@Autowired

private KafkaTemplate template;

private final CountDownLatch latch = new CountDownLatch(3);

@Override

public void run(String... args) throws Exception {

this.template.send("myTopic", "foo1");

this.template.send("myTopic", "foo2");

this.template.send("myTopic", "foo3");

latch.await(60, TimeUnit.SECONDS);

logger.info("All received");

}

@KafkaListener(topics = "myTopic")

public void

listen(ConsumerRecord, ?> cr) throws Exception {

logger.info(cr.toString());

latch.countDown();

}

}

Boot能完成大多數配置。當使用local broker時,只需配置如下屬性:

Example 1. application.properties

spring.kafka.consumer.group-id=foo

spring.kafka.consumer.auto-offset-reset=earliest

之所以需要第1個屬性,是因為我們需要根據組來為消費者分配topic分區。

第2個屬性可確保新的消費組能獲得先前發送的消息,因為容器可能會在發送完成後啟動。


分享到:


相關文章: