Storm集成Kafka环境搭建,原理介绍,功能开发,功能测试

引言

前面,我们已经介绍了kafka入门知识,storm的入门知识,如果你对这些都不熟悉,请参考之前发布的文章,这篇文章主要是实战,没有过多的理论介绍,如果有任何疑问,可以在下面留言,会在第一时间进行回复……文章最后,附有完整的代码实例

一、Kafka环境启动

版本选择:版本很重要,否则搭建环境,会出现很多问题,切记本篇文章只针对以下版本有效,其它版本请自行测试……

Zookeeper:3.4.12

Kafka:0.9.0.1

Storm:1.1.1

1、启动zookeeper环境

zkServer.sh start

2、启动Kafka环境

cd /usr/local/kafka_2.11-0.9.0.1

bin/kafka-server-start.sh -daemon config/server-1.properties &

3、查看进程,看是否启动成功

[root@jikeh ~]# jps
2322 Jps
2052 ConsoleProducer
1556 QuorumPeerMain #zookeeper启动成功
1966 Kafka #kafka启动成功

二、Storm集成Kafka

1、Kafka消费者

1)pom依赖

<properties>
<project.build.sourceencoding>UTF-8/<project.build.sourceencoding>
<project.reporting.outputencoding>UTF-8/<project.reporting.outputencoding>
<java.version>1.8/<java.version>
<maven.compiler.source>1.8/<maven.compiler.source>


<maven.compiler.target>1.8/<maven.compiler.target>
<spring-boot.version>1.5.16.RELEASE/<spring-boot.version>
<spring-boot-maven-plugin.version>1.5.16.RELEASE/<spring-boot-maven-plugin.version>
<maven-compiler-plugin.version>3.5.1/<maven-compiler-plugin.version>
<storm.version>1.1.1/<storm.version>
<kafka.clients.version>0.9.0.1/<kafka.clients.version>
/<properties>

<dependencies>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot/<artifactid>
<version>${spring-boot.version}/<version>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
<version>${spring-boot.version}/<version>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-configuration-processor/<artifactid>
<version>${spring-boot.version}/<version>
/<dependency>

<dependency>
<groupid>org.apache.storm/<groupid>
<artifactid>storm-core/<artifactid>
<version>${storm.version}/<version>

<exclusions>
<exclusion>
<groupid>org.apache.logging.log4j/<groupid>
<artifactid>log4j-slf4j-impl/<artifactid>
/<exclusion>
/<exclusions>
/<dependency>

<dependency>
<groupid>org.apache.storm/<groupid>
<artifactid>storm-kafka/<artifactid>
<version>${storm.version}/<version>

<exclusions>
<exclusion>
<groupid>org.apache.kafka/<groupid>
<artifactid>kafka-clients/<artifactid>
/<exclusion>

<exclusion>
<artifactid>slf4j-api/<artifactid>
<groupid>org.slf4j/<groupid>
/<exclusion>
/<exclusions>
/<dependency>

<dependency>
<groupid>org.apache.kafka/<groupid>
<artifactid>kafka-clients/<artifactid>
<version>${kafka.clients.version}/<version>
<exclusions>
<exclusion>
<artifactid>slf4j-api/<artifactid>
<groupid>org.slf4j/<groupid>
/<exclusion>
/<exclusions>
/<dependency>


<dependency>
<groupid>org.apache.kafka/<groupid>
<artifactid>kafka_2.11/<artifactid>
<version>${kafka.clients.version}/<version>
<exclusions>
<exclusion>
<groupid>org.apache.kafka/<groupid>
<artifactid>kafka-clients/<artifactid>
/<exclusion>
<exclusion>
<groupid>org.apache.zookeeper/<groupid>
<artifactid>zookeeper/<artifactid>
/<exclusion>
<exclusion>
<groupid>log4j/<groupid>
<artifactid>log4j/<artifactid>
/<exclusion>
<exclusion>
<artifactid>slf4j-api/<artifactid>
<groupid>org.slf4j/<groupid>
/<exclusion>
<exclusion>
<artifactid>slf4j-log4j12/<artifactid>
<groupid>org.slf4j/<groupid>
/<exclusion>
/<exclusions>
/<dependency>

/<dependencies>

2)Kafka Spout:接收Kafka消息

//这个地方其实就是kafka配置文件里边的zookeeper.connect这个参数,可以去那里拿过来。
//ZkStr 字符串格式是 ip:port(例如:localhost:2181).brokerZkPath 是存储所有 topic 和 partition信息的zk 根路径.默认情况下,Kafka使用 /brokers路径.
String brokerZkStr = "192.168.199.147:2181";
ZkHosts zkHosts = new ZkHosts(brokerZkStr);

String topic = "jikeh";

//汇报offset信息的root路径
String offsetZkRoot = "/" + topic;

//存储该spout id的消费offset信息,譬如以topoName来命名
String offsetZkId = UUID.randomUUID().toString();

SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//kafka.api.OffsetRequest.EarliestTime(): 从topic 初始位置读取消息 (例如,从最老的那个消息开始)
//kafka.api.OffsetRequest.LatestTime(): 从topic尾部开始读取消息 (例如,新写入topic的信息)
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
KafkaSpout spout = new KafkaSpout(kafkaConfig);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout);

3)处理kafka消息

public class KafkaConsumerBolt extends BaseRichBolt {

private OutputCollector collector;

@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}



@Override
public void execute(Tuple tuple) {
try {
String message = tuple.getStringByField("str");

// String message = tuple.getString(0);

System.out.println("--->" + message);

this.collector.ack(tuple);

} catch (Exception e) {
this.collector.fail(tuple);
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}
}

4)本地模式运行Storm作业

Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaConsumerTopology", config, builder.createTopology());

2、Kafka生产者

1)pom

与上面相同

2)数据源

public class MessageSpout extends BaseRichSpout {

private Fields fields = null;
private SpoutOutputCollector collector ;

public MessageSpout(Fields fields){
this.fields = fields;
}

@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {


this.collector = spoutOutputCollector;
}

@Override
public void nextTuple() {
for (int i = 0; i < 5; i++) {
this.collector.emit(new Values("jikeh", "visit--" + i));
}
Utils.sleep(2000);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(this.fields);
}

@Override
public void ack(Object o) {

}

@Override
public void fail(Object o) {

}
}

3)数据处理并写入kafka

//2、写入kafka
//set producer properties.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.199.147:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.RETRIES_CONFIG, 3);

KafkaBolt bolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector(topicName))
// .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"))
;
builder.setBolt("bolt", bolt).shuffleGrouping("spout");

代码下载地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git

项目名:spring-boot-storm-kafka