Storm集成Kafka环境搭建,原理介绍,功能开发,功能测试

引言

前面,我们已经介绍了kafka入门知识,storm的入门知识,如果你对这些都不熟悉,请参考之前发布的文章,这篇文章主要是实战,没有过多的理论介绍,如果有任何疑问,可以在下面留言,会在第一时间进行回复……文章最后,附有完整的代码实例

一、Kafka环境启动

版本选择:版本很重要,否则搭建环境,会出现很多问题,切记本篇文章只针对以下版本有效,其它版本请自行测试……

Zookeeper:3.4.12

Kafka:0.9.0.1

Storm:1.1.1

1、启动zookeeper环境

zkServer.sh start

2、启动Kafka环境

cd /usr/local/kafka_2.11-0.9.0.1

bin/kafka-server-start.sh -daemon config/server-1.properties &

3、查看进程,看是否启动成功

[root@jikeh ~]# jps
2322 Jps
2052 ConsoleProducer
1556 QuorumPeerMain #zookeeper启动成功
1966 Kafka #kafka启动成功

二、Storm集成Kafka

1、Kafka消费者

1)pom依赖

<properties>
<project.build.sourceencoding>UTF-8
<project.reporting.outputencoding>UTF-8

<java.version>1.8
<maven.compiler.source>1.8
<maven.compiler.target>1.8
<spring-boot.version>1.5.16.RELEASE
<spring-boot-maven-plugin.version>1.5.16.RELEASE
<maven-compiler-plugin.version>3.5.1
<storm.version>1.1.1
<kafka.clients.version>0.9.0.1


<dependencies>
<dependency>
<groupid>org.springframework.boot
<artifactid>spring-boot
<version>${spring-boot.version}

<dependency>
<groupid>org.springframework.boot
<artifactid>spring-boot-starter-web
<version>${spring-boot.version}

<dependency>
<groupid>org.springframework.boot
<artifactid>spring-boot-configuration-processor
<version>${spring-boot.version}


<dependency>

<groupid>org.apache.storm
<artifactid>storm-core
<version>${storm.version}

<exclusions>
<exclusion>
<groupid>org.apache.logging.log4j
<artifactid>log4j-slf4j-impl




<dependency>
<groupid>org.apache.storm
<artifactid>storm-kafka
<version>${storm.version}

<exclusions>
<exclusion>
<groupid>org.apache.kafka
<artifactid>kafka-clients

<exclusion>
<artifactid>slf4j-api
<groupid>org.slf4j




<dependency>
<groupid>org.apache.kafka
<artifactid>kafka-clients
<version>${kafka.clients.version}
<exclusions>
<exclusion>
<artifactid>slf4j-api
<groupid>org.slf4j





<dependency>
<groupid>org.apache.kafka
<artifactid>kafka_2.11
<version>${kafka.clients.version}
<exclusions>
<exclusion>
<groupid>org.apache.kafka
<artifactid>kafka-clients

<exclusion>
<groupid>org.apache.zookeeper
<artifactid>zookeeper

<exclusion>
<groupid>log4j
<artifactid>log4j


<exclusion>
<artifactid>slf4j-api
<groupid>org.slf4j

<exclusion>
<artifactid>slf4j-log4j12
<groupid>org.slf4j





2)Kafka Spout:接收Kafka消息

//这个地方其实就是kafka配置文件里边的zookeeper.connect这个参数,可以去那里拿过来。
//ZkStr 字符串格式是 ip:port(例如:localhost:2181).brokerZkPath 是存储所有 topic 和 partition信息的zk 根路径.默认情况下,Kafka使用 /brokers路径.
String brokerZkStr = "192.168.199.147:2181";
ZkHosts zkHosts = new ZkHosts(brokerZkStr);

String topic = "jikeh";

//汇报offset信息的root路径
String offsetZkRoot = "/" + topic;

//存储该spout id的消费offset信息,譬如以topoName来命名
String offsetZkId = UUID.randomUUID().toString();

SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId);

kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//kafka.api.OffsetRequest.EarliestTime(): 从topic 初始位置读取消息 (例如,从最老的那个消息开始)
//kafka.api.OffsetRequest.LatestTime(): 从topic尾部开始读取消息 (例如,新写入topic的信息)
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
KafkaSpout spout = new KafkaSpout(kafkaConfig);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout);

3)处理kafka消息

public class KafkaConsumerBolt extends BaseRichBolt {

private OutputCollector collector;

@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}

@Override
public void execute(Tuple tuple) {
try {
String message = tuple.getStringByField("str");

// String message = tuple.getString(0);

System.out.println("--->" + message);

this.collector.ack(tuple);

} catch (Exception e) {
this.collector.fail(tuple);
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}
}

4)本地模式运行Storm作业

Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaConsumerTopology", config, builder.createTopology());

2、Kafka生产者

1)pom

与上面相同

2)数据源

public class MessageSpout extends BaseRichSpout {

private Fields fields = null;
private SpoutOutputCollector collector ;

public MessageSpout(Fields fields){
this.fields = fields;
}

@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}

@Override
public void nextTuple() {
for (int i = 0; i < 5; i++) {
this.collector.emit(new Values("jikeh", "visit--" + i));
}
Utils.sleep(2000);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(this.fields);
}

@Override
public void ack(Object o) {

}

@Override
public void fail(Object o) {

}
}

3)数据处理并写入kafka

//2、写入kafka
//set producer properties.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.199.147:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.RETRIES_CONFIG, 3);

KafkaBolt bolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector(topicName))
// .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"))
;
builder.setBolt("bolt", bolt).shuffleGrouping("spout");

代码下载地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git

项目名:spring-boot-storm-kafka


分享到:


相關文章: