SpringMVC整合Kafka實戰

1.SpringMVC整合生產者(Producer)

SpringMVC整合生產者比較簡單,我直接用一個單例對象來管理生產者,這樣保證生產者也是一個單例對象。

1.1 導入kafka的maven依賴

SpringMVC整合Kafka實戰

SpringMVC整合Kafka實戰

1.2 創建單例對象管理生產者

我使用靜態內部類的方式創建單例對象,保證單例對象的線程安全。直接上代碼

<code>public class KafkaManager {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaManager.class);

private static final String BOOTSTRAP_SERVERS = Config.getMqConfig("kafka.bootstrap.servers");


public static final String TOPIC_LOGIN = "Topic_login";
public static final String TOPIC_OLD_API = "Topic_api_use";
public static final String TOPIC_COURSE_VISIT = "Topic_course_visit";

private boolean kafkaAvailable = true;

private KafkaProducer<string> producer;

private KafkaManager() {
this.initManger();
}
private static class SingletonManager {
private static final KafkaManager singletonObj = new KafkaManager();
}

public static KafkaManager getInstance() {
return KafkaManager.SingletonManager.singletonObj;
}

private void initManger() {
try {
Properties producerProperties = new Properties();
//設置接入點,請通過控制檯獲取對應 Topic 的接入點
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
//消息隊列 Kafka 消息的序列化方式
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//請求的最長等待時間

producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
this.producer = new KafkaProducer<string>(producerProperties);

} catch (Exception ex) {
LOGGER.error("創建kafka失敗",ex);
this.kafkaAvailable = false;
}
}

protected void finalize () {
if (null != this.producer) {
this.producer.close();
}
}

public boolean sendMessage(String topic, JSONObject body) {
if (false == this.kafkaAvailable) {
return false;
}
boolean succeed = true;
try {
ProducerRecord<string> kafkaMessage = new ProducerRecord<string>(topic, body.toJSONString());

//發送消息,並獲得一個 Future 對象
Future<recordmetadata> metadataFuture = this.producer.send(kafkaMessage);

} catch (Exception e) {
LOGGER.error("發送kafka失敗",e);
succeed = false;
}
return succeed;
}

}/<recordmetadata>/<string>/<string>/<string>/<string>/<code>

1.3 生產者發送消息

SpringMVC整合Kafka實戰

SpringMVC整合Kafka實戰


2.SpringMVC整合消費者(Consumer)

相對於生產者,SpringMVC整合消費者就顯得複雜一點。

2.1 引入kafka消費者的maven依賴

<code><dependency>
<groupid>org.apache.kafka/<groupid>
<artifactid>kafka-clients/<artifactid>
<version>0.10.0.0/<version>
/<dependency>


<dependency>
<groupid>org.springframework.kafka/<groupid>
<artifactid>spring-kafka/<artifactid>
<version>1.1.1.RELEASE/<version>
/<dependency>/<code>

如上所示,除了kafka客戶端的依賴外還需要spring-kafka的依賴。

這裡說明一下,我使用Spring配置文件創建bean的方式來管理kafka的消費者。當然還有其他方式,這裡我選了比較簡單的一種,其他方式可以自行百度一下。

2.2 創建監聽類

<code>import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.MessageListener;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class TopicApiUseKafkaListener implements MessageListener<string> {

private static final Logger LOGGER = LoggerFactory.getLogger(TopicApiUseKafkaListener.class);

private static final String TABLE_PRE = "";

@Override
public void onMessage(ConsumerRecord<string> stringStringConsumerRecord) {
IApiUseService apiUseService = SpringContextUtil.getBean("apiUseService");
List<apiuse> dataList = new ArrayList<>();
String tableName = TABLE_PRE + DateUtil.formatDate(new Date(),"yyyyMM");
String value = stringStringConsumerRecord.value();
JSONObject bodyJson = JSONObject.parseObject(value);

Integer platform = bodyJson.getInteger("platform");
String url = bodyJson.getString("url");
String apiVersion = bodyJson.getString("apiVersion");
String createDate = bodyJson.getString("createDate");

if (!StringUtils.isEmpty(url) && null != platform && !StringUtils.isEmpty(createDate) && !StringUtils.isEmpty(apiVersion)) {

ApiUse item = new ApiUse();
item.setPlatform(platform);
item.setUrl(url);
item.setApiVersion(apiVersion);
item.setCreateDate(createDate);
dataList.add(item);

}
if (dataList.size() > 0) {
try {
apiUseService.addApiUse(tableName, dataList);
} catch (Exception ex1) {
LOGGER.error("api使用插入錯誤",ex1);
}
}
}
}
/<apiuse>/<string>/<string>/<code>

2.3 配置監聽類

kafka的監聽類的配置需要配置在dispatcher-servlet.xml文件中,配置在spring-context.xml文件中是無效的,這是由Spring的加載機制決定的。

先定義consumer的參數再創建一個全局的consumerFactory的bean對象,如下圖所示。這裡需要注意enable.auto.commit這個參數,這是kafka自動提交的參數,如果設置成true就有kafka服務端來自動提交,但是由於網絡等因素kafka服務端自動提交有時候會失敗導致重複消費消息。所以這裡設置成false,雖然設置成false但是實際上不需要開發者來手動提交,提交的操作是由Spring-kafka這個sdk類處理的,有興趣的童鞋可以看一下spring-kafka的源碼就知道了。

SpringMVC整合Kafka實戰

SpringMVC整合Kafka實戰

接下來創建實際執行消費的類,就是我們剛才創建的那個類。

SpringMVC整合Kafka實戰

SpringMVC整合Kafka實戰

如上圖所示,原則上每個topic就要創建一個對應的監聽類,然後再配置監聽類需要監聽的topic。

內容就到這裡。更多精彩文章敬請期待。


分享到:


相關文章: