Kafka 使用


前面我们讲解了RabbitMQ,也了解了RabbitMQ的工作模式,下面我们来看一下Kafka 是如何进行消息发送的,Kafka 号称性能怪兽,吞吐量非常高,当然kafka 设计之初就是为了解决高吞吐量的问题。Kafka 也一直被用与大数据、海量日志的处理等问题。

1.新建项目

新建项目模块fw-cloud-mq-kafka用来测试Kafka 的生产和消息,本文并不会测试Kafka 的海量数据,仅仅为了Kafka的使用。

Kafka 使用

2.maven 配置

配置中我们主要引入了spring-kafka,这是Spring 为Kafka 提供的工具包

<code><dependencies>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.kafka/<groupid>
<artifactid>spring-kafka/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-test/<artifactid>
/<dependency>
/<dependencies>
/<code>

3.新建启动类

本项目并没有加入到微服务中,如果有需要,读者可以自行加入,当然如果加入了可以从RESTFUL接口接收数据,再推送到对应的Topic 中。

<code>@SpringBootApplication
public class FwKafkaMqApplication {
public static void main(String[] args) {
SpringApplication.run(FwKafkaMqApplication.class, args);
}
}
/<code>

4.应用配置

<code>server:
port: 8781
spring:
application:
name: fw-cloud-mq-kafka
kafka:

bootstrap-servers: localhost:9092
producer:
acks: 1
retries: 0
batch-size: 16384
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

consumer:
group-id: testGroup
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer


/<code>

kafka 每个配置对应的意思如下:

  • kafka.bootstrap-servers
    指定kafka server的地址,集群配多个,中间,逗号隔开
  • kafka.producer.asks
    可以设置的值为:all, -1, 0, 1
    procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
    acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。

    acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
    acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
  • kafka.producer.retries
    写入失败时,重试次数。当leader节点失效,一个副本节点会替代成为leader节点,此时可能出现写入失败。
    当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
  • kafka.producer.batch-size
    每次批量发送消息的数量,produce积累到一定数据,一次发送
  • kafka.producer.key-serializer
    指定消息key编解码方式
  • kafka.producer.value-serializer
    指定消息体的编解码方式
  • kafka.consumer.group-id
    指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
  • kafka.consumer.auto-offset-reset

    smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
  • kafka.consumer.enable-auto-commit
    设置自动提交offset
  • kafka.consumer.auto-commit-interval
    如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
  • kafka.consumer.key-deserializer
    指定消息key编解码方式
  • kafka.consumer.value-deserializer
    指定消息体编解码方式
    当然kafka 还有很多配置,我们简单使用这些已经够了

5.新建发送方

发送方需要引入KafkaTemplate,用来发送消息,发送的时候我们指定消息的topic 和内容。并把消息内容打印出来。并把发送类注册为组件。

<code>/**
* @author xuyisu
* @description 发送方
* @date 2019/12/18
*/
@Component
@Slf4j
public class FwSender {

@Autowired
private KafkaTemplate<string> kafkaTemplate;

public boolean send(){

String message="Hello World:"+ DateUtil.now();
log.info("FwSender:"+message);
//第一个参数是topic,第二个参数是内容
kafkaTemplate.send("fwcloud",message);
return true;
}
}
/<string>/<code>

6.新建消费方

消费放需要添加一个公共方法并设置@KafkaListener和需要监听的Topic,就可以实现消息的监听并消费了。

<code>/**
* @author xuyisu
* @description 接收方
* @date 2019/12/18
*/
@Component
@Slf4j
public class FwReceiver {

@KafkaListener(topics = "fwcloud")
public void onMessage(String message){
log.info(message);
}
}
/<code>

7.启动项目

启动项目前,需要先启动kafka
然后启动单元测试FwSenderTest发送消息
通过控制台我们看推送的消息

<code>2020-01-12 15:01:34 INFO  main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34

2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
/<code>


然后我们再看一下消费的信息

<code>2020-01-12 15:01:35 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
/<code>


关于微服务相关的学习,作者写了一本很详细的教程,可以点击下方链接,教程里面包含详细的代码地址,欢迎star,fork


分享到:


相關文章: