使用Spring Request-Reply實現基於Kafka的同步請求響應

使用Spring Request-Reply實現基於Kafka的同步請求響應

大家提到Kafka時第一印象就是它是一個快速的異步消息處理系統,不同於通常tomcat之類應用服務器和前端之間的請求/響應方式請求,客戶端發出一個請求,必然會等到一個響應,這種方式對Kafka來說並不自然,Kafka是一種事件驅動方式,事件激活然後響應,這種方式對很多人接受起來不方便,為了實現請求 - 響應模型,開發人員必須在消息的生產者記錄中構建相關ID系統,並將其與消息的消費者記錄中的ID進行匹配,找到那個請求ID再使用Kafka的一個隊列進行回覆。

隨著Spring-Kafka最新版本推出(Spring replying kafka 模板),這種請求-響應語義現在已經變成現成的了,本案例例演示了Spring-Kafka實現的簡單性。

下圖是本案例的演示架構圖,這個案例是以同步行為返回兩個數字總和的結果。

客戶端 --->請求---> RESTcontroll ---> Spring replying kafka 模板 -->Kafka的請求主題 -->Spring Kafka監聽器

| |

|

下面我們開始看看開發這個演示步驟:

設置Springboot啟動類

首先需要在pom.xml引入Spring kafka模板:

<dependency>

<groupId>org.springframework.kafkagroupId>

<artifactId>spring-kafkaartifactId>

dependency>

代碼如下:

@SpringBootApplication

public class RequestReplyKafkaApplication {

public static void main(String[] args) {

SpringApplication.run(RequestReplyKafkaApplication.class

, args);

}

}

設置Spring ReplyingKafkaTemplate

我們需要在Springboot配置類的KafkaConfig對Spring kafka模板進行配置:

@Configuration

public class KafkaConfig {

在這個配置類中,我們需要配置核心的ReplyingKafkaTemplate類,這個類繼承了 KafkaTemplate 提供請求/響應的的行為;還有一個生產者工廠(參見 ProducerFactory 下面的代碼)和 KafkaMessageListenerContainer。這是最基本的設置,因為請求響應模型需要對應到消息生產者和消費者的行為。

// 這是核心的ReplyingKafkaTemplate

@Bean

public ReplyingKafkaTemplate replyKafkaTemplate(ProducerFactory pf, KafkaMessageListenerContainer container) {

return new ReplyingKafkaTemplate<>(pf, container);

}

// 配件:監聽器容器Listener Container to be set up in ReplyingKafkaTemplate

@Bean

public KafkaMessageListenerContainer replyContainer(ConsumerFactory cf) {

ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);

return new KafkaMessageListenerContainer<>(cf, containerProperties);

}

// 配件:生產者工廠Default Producer Factory to be used in ReplyingKafkaTemplate

@Bean

public ProducerFactory producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());

}

// 配件:kafka生產者的Kafka配置Standard KafkaProducer settings - specifying brokerand serializer

@Bean

public Map producerConfigs() {

Map props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,

bootstrapServers);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

StringSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

return props;

}

設置spring-Kafka的監聽器

這與通常創建的Kafka消費者相同。唯一的變化是額外是在工廠中設置ReplyTemplate,這是必須的,因為消費者需要將計算結果放入到Kafka的響應主題。

//消費者工廠 Default Consumer Factory

@Bean

public ConsumerFactory consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(Model.class));

}

// 併發監聽器容器Concurrent Listner container factory

@Bean

public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

// NOTE - set up of reply template 設置響應模板

factory.setReplyTemplate(kafkaTemplate());

return factory;

}

// Standard KafkaTemplate

@Bean

public KafkaTemplate kafkaTemplate() {

return new KafkaTemplate<>(producerFactory());

}

編寫我們的kafka消費者

這是過去創建的Kafka消費者一樣。唯一的變化是附加了@SendTo註釋,此註釋用於在響應主題上返回業務結果。

@KafkaListener(topics = "${kafka.topic.request-topic}")

@SendTo

public

Model listen(Model request) throws InterruptedException {

int sum = request.getFirstNumber() + request.getSecondNumber();

request.setAdditionalProperty("sum", sum);

return request;

}

這個消費者用於業務計算,把客戶端通過請求傳入的兩個數字進行相加,然後返回這個請求,通過@SendTo發送到Kafka的響應主題。

總結服務

現在,讓我們將所有這些都結合在一起放在RESTcontroller,步驟分為幾步,先創建生產者記錄,並在記錄頭部中設置接受響應的Kafka主題,這樣

把請求和響應在Kafka那裡對應起來,然後通過模板發佈消息到Kafka,再通過future.get()堵塞等待Kafka的響應主題發送響應結果過來。這時再

打印結果記錄中的頭部信息,會看到Spring自動生成相關ID。

@ResponseBody

@PostMapping(value="/sum",produces=MediaType.APPLICATION_JSON_VALUE,consumes=MediaType.APPLICATION_JSON_VALUE)

public Model sum(@RequestBody Model request)throws InterruptedException,ExecutionException {

//創建生產者記錄

ProducerRecord record = new ProducerRecord(requestTopic,request);

//在記錄頭部中設置響應主題

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));

//發佈到kafka主題中

RequestReplyFuture sendAndReceive = kafkaTemplate.sendAndReceive(record);

//確認生產者是否成功生產

SendResult sendResult = sendAndReceive.getSendFuture().get();

//打印結果記錄中所有頭部信息 會看到Spring自動生成的相關ID,這個ID是由消費端@SendTo 註釋返回的值。

sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString()));

//獲取消費者記錄

ConsumerRecord consumerRecord = sendAndReceive.get();

//返回消費者結果

return consumerRecord.value();

}

併發消費者

即使你要創建請求主題在三個分區中,三個併發的消費者的響應仍然合併到一個Kafka響應主題,這樣,Spring偵聽器的容器能夠完成匹配相關ID的繁重工作。

整個請求/響應的模型是一致的。

現在我們可以再修改啟動類如下:

@ComponentScan(basePackages = {

"com.gauravg.config",

"com.gauravg.consumer",

"com.gauravg.controller",

"com.gauravg.model"

})

@SpringBootApplication

public class RequestReplyKafkaApplication {

public static void main(String[] args) {

SpringApplication.run(RequestReplyKafkaApplication.class, args);

}

}

下面開始運行這個案例:

1.先啟動kafka

2.直接運行上面啟動類

3.通過postman等工具訪問:

http://localhost:8080/sum

post數據:

{

"firstNumber": "111",

"secondNumber": "2222"

}

返回結果是:

{

"firstNumber": 111,

"secondNumber": 2222,

"sum": 2333

}

在控制檯輸出記錄頭部信息:

kafka_replyTopic:[B@1f59b198

kafka_correlationId:[B@356a7326

__TypeId__:[B@1a9111f

可見,Spring自動生成聚合ID(correlationId),無需我們自己手工比對了。


分享到:


相關文章: